Whamcloud - gitweb
LU-3259 clio: cl_lock simplification
[fs/lustre-release.git] / lustre / obdclass / cl_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * Client Lustre Object.
37  *
38  *   Author: Nikita Danilov <nikita.danilov@sun.com>
39  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
40  */
41
42 /*
43  * Locking.
44  *
45  *  i_mutex
46  *      PG_locked
47  *          ->coh_attr_guard
48  *          ->ls_guard
49  */
50
51 #define DEBUG_SUBSYSTEM S_CLASS
52
53 #include <libcfs/libcfs.h>
54 /* class_put_type() */
55 #include <obd_class.h>
56 #include <obd_support.h>
57 #include <lustre_fid.h>
58 #include <libcfs/list.h>
59 #include <libcfs/libcfs_hash.h> /* for cfs_hash stuff */
60 #include <cl_object.h>
61 #include "cl_internal.h"
62
63 static struct kmem_cache *cl_env_kmem;
64
65 /** Lock class of cl_object_header::coh_attr_guard */
66 static struct lock_class_key cl_attr_guard_class;
67
68 extern __u32 lu_context_tags_default;
69 extern __u32 lu_session_tags_default;
70 /**
71  * Initialize cl_object_header.
72  */
73 int cl_object_header_init(struct cl_object_header *h)
74 {
75         int result;
76
77         ENTRY;
78         result = lu_object_header_init(&h->coh_lu);
79         if (result == 0) {
80                 spin_lock_init(&h->coh_attr_guard);
81                 lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
82                 h->coh_page_bufsize = 0;
83         }
84         RETURN(result);
85 }
86 EXPORT_SYMBOL(cl_object_header_init);
87
88 /**
89  * Finalize cl_object_header.
90  */
91 void cl_object_header_fini(struct cl_object_header *h)
92 {
93         lu_object_header_fini(&h->coh_lu);
94 }
95 EXPORT_SYMBOL(cl_object_header_fini);
96
97 /**
98  * Returns a cl_object with a given \a fid.
99  *
100  * Returns either cached or newly created object. Additional reference on the
101  * returned object is acquired.
102  *
103  * \see lu_object_find(), cl_page_find(), cl_lock_find()
104  */
105 struct cl_object *cl_object_find(const struct lu_env *env,
106                                  struct cl_device *cd, const struct lu_fid *fid,
107                                  const struct cl_object_conf *c)
108 {
109         might_sleep();
110         return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
111 }
112 EXPORT_SYMBOL(cl_object_find);
113
114 /**
115  * Releases a reference on \a o.
116  *
117  * When last reference is released object is returned to the cache, unless
118  * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
119  *
120  * \see cl_page_put(), cl_lock_put().
121  */
122 void cl_object_put(const struct lu_env *env, struct cl_object *o)
123 {
124         lu_object_put(env, &o->co_lu);
125 }
126 EXPORT_SYMBOL(cl_object_put);
127
128 /**
129  * Acquire an additional reference to the object \a o.
130  *
131  * This can only be used to acquire _additional_ reference, i.e., caller
132  * already has to possess at least one reference to \a o before calling this.
133  *
134  * \see cl_page_get(), cl_lock_get().
135  */
136 void cl_object_get(struct cl_object *o)
137 {
138         lu_object_get(&o->co_lu);
139 }
140 EXPORT_SYMBOL(cl_object_get);
141
142 /**
143  * Returns the top-object for a given \a o.
144  *
145  * \see cl_io_top()
146  */
147 struct cl_object *cl_object_top(struct cl_object *o)
148 {
149         struct cl_object_header *hdr = cl_object_header(o);
150         struct cl_object *top;
151
152         while (hdr->coh_parent != NULL)
153                 hdr = hdr->coh_parent;
154
155         top = lu2cl(lu_object_top(&hdr->coh_lu));
156         CDEBUG(D_TRACE, "%p -> %p\n", o, top);
157         return top;
158 }
159 EXPORT_SYMBOL(cl_object_top);
160
161 /**
162  * Returns pointer to the lock protecting data-attributes for the given object
163  * \a o.
164  *
165  * Data-attributes are protected by the cl_object_header::coh_attr_guard
166  * spin-lock in the top-object.
167  *
168  * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
169  */
170 static spinlock_t *cl_object_attr_guard(struct cl_object *o)
171 {
172         return &cl_object_header(cl_object_top(o))->coh_attr_guard;
173 }
174
175 /**
176  * Locks data-attributes.
177  *
178  * Prevents data-attributes from changing, until lock is released by
179  * cl_object_attr_unlock(). This has to be called before calls to
180  * cl_object_attr_get(), cl_object_attr_set().
181  */
182 void cl_object_attr_lock(struct cl_object *o)
183 __acquires(cl_object_attr_guard(o))
184 {
185         spin_lock(cl_object_attr_guard(o));
186 }
187 EXPORT_SYMBOL(cl_object_attr_lock);
188
189 /**
190  * Releases data-attributes lock, acquired by cl_object_attr_lock().
191  */
192 void cl_object_attr_unlock(struct cl_object *o)
193 __releases(cl_object_attr_guard(o))
194 {
195         spin_unlock(cl_object_attr_guard(o));
196 }
197 EXPORT_SYMBOL(cl_object_attr_unlock);
198
199 /**
200  * Returns data-attributes of an object \a obj.
201  *
202  * Every layer is asked (by calling cl_object_operations::coo_attr_get())
203  * top-to-bottom to fill in parts of \a attr that this layer is responsible
204  * for.
205  */
206 int cl_object_attr_get(const struct lu_env *env, struct cl_object *obj,
207                        struct cl_attr *attr)
208 {
209         struct lu_object_header *top;
210         int result;
211
212         assert_spin_locked(cl_object_attr_guard(obj));
213         ENTRY;
214
215         top = obj->co_lu.lo_header;
216         result = 0;
217         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
218                 if (obj->co_ops->coo_attr_get != NULL) {
219                         result = obj->co_ops->coo_attr_get(env, obj, attr);
220                         if (result != 0) {
221                                 if (result > 0)
222                                         result = 0;
223                                 break;
224                         }
225                 }
226         }
227         RETURN(result);
228 }
229 EXPORT_SYMBOL(cl_object_attr_get);
230
231 /**
232  * Updates data-attributes of an object \a obj.
233  *
234  * Only attributes, mentioned in a validness bit-mask \a v are
235  * updated. Calls cl_object_operations::coo_attr_set() on every layer, bottom
236  * to top.
237  */
238 int cl_object_attr_set(const struct lu_env *env, struct cl_object *obj,
239                        const struct cl_attr *attr, unsigned v)
240 {
241         struct lu_object_header *top;
242         int result;
243
244         assert_spin_locked(cl_object_attr_guard(obj));
245         ENTRY;
246
247         top = obj->co_lu.lo_header;
248         result = 0;
249         list_for_each_entry_reverse(obj, &top->loh_layers, co_lu.lo_linkage) {
250                 if (obj->co_ops->coo_attr_set != NULL) {
251                         result = obj->co_ops->coo_attr_set(env, obj, attr, v);
252                         if (result != 0) {
253                                 if (result > 0)
254                                         result = 0;
255                                 break;
256                         }
257                 }
258         }
259         RETURN(result);
260 }
261 EXPORT_SYMBOL(cl_object_attr_set);
262
263 /**
264  * Notifies layers (bottom-to-top) that glimpse AST was received.
265  *
266  * Layers have to fill \a lvb fields with information that will be shipped
267  * back to glimpse issuer.
268  *
269  * \see cl_lock_operations::clo_glimpse()
270  */
271 int cl_object_glimpse(const struct lu_env *env, struct cl_object *obj,
272                       struct ost_lvb *lvb)
273 {
274         struct lu_object_header *top;
275         int result;
276
277         ENTRY;
278         top = obj->co_lu.lo_header;
279         result = 0;
280         list_for_each_entry_reverse(obj, &top->loh_layers, co_lu.lo_linkage) {
281                 if (obj->co_ops->coo_glimpse != NULL) {
282                         result = obj->co_ops->coo_glimpse(env, obj, lvb);
283                         if (result != 0)
284                                 break;
285                 }
286         }
287         LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top),
288                          "size: "LPU64" mtime: "LPU64" atime: "LPU64" "
289                          "ctime: "LPU64" blocks: "LPU64"\n",
290                          lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
291                          lvb->lvb_ctime, lvb->lvb_blocks);
292         RETURN(result);
293 }
294 EXPORT_SYMBOL(cl_object_glimpse);
295
296 /**
297  * Updates a configuration of an object \a obj.
298  */
299 int cl_conf_set(const struct lu_env *env, struct cl_object *obj,
300                 const struct cl_object_conf *conf)
301 {
302         struct lu_object_header *top;
303         int result;
304
305         ENTRY;
306         top = obj->co_lu.lo_header;
307         result = 0;
308         list_for_each_entry(obj, &top->loh_layers, co_lu.lo_linkage) {
309                 if (obj->co_ops->coo_conf_set != NULL) {
310                         result = obj->co_ops->coo_conf_set(env, obj, conf);
311                         if (result != 0)
312                                 break;
313                 }
314         }
315         RETURN(result);
316 }
317 EXPORT_SYMBOL(cl_conf_set);
318
319 /**
320  * Prunes caches of pages and locks for this object.
321  */
322 int cl_object_prune(const struct lu_env *env, struct cl_object *obj)
323 {
324         struct lu_object_header *top;
325         struct cl_object *o;
326         int result;
327         ENTRY;
328
329         top = obj->co_lu.lo_header;
330         result = 0;
331         list_for_each_entry(o, &top->loh_layers, co_lu.lo_linkage) {
332                 if (o->co_ops->coo_prune != NULL) {
333                         result = o->co_ops->coo_prune(env, o);
334                         if (result != 0)
335                                 break;
336                 }
337         }
338
339         RETURN(result);
340 }
341 EXPORT_SYMBOL(cl_object_prune);
342
343 /**
344  * Helper function removing all object locks, and marking object for
345  * deletion. All object pages must have been deleted at this point.
346  *
347  * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
348  * and sub- objects respectively.
349  */
350 void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
351 {
352         struct cl_object_header *hdr = cl_object_header(obj);
353
354         set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
355 }
356 EXPORT_SYMBOL(cl_object_kill);
357
358 void cache_stats_init(struct cache_stats *cs, const char *name)
359 {
360         int i;
361
362         cs->cs_name = name;
363         for (i = 0; i < CS_NR; i++)
364                 atomic_set(&cs->cs_stats[i], 0);
365 }
366
367 int cache_stats_print(const struct cache_stats *cs, struct seq_file *m, int h)
368 {
369         int i;
370
371         /*
372          *   lookup    hit    total  cached create
373          * env: ...... ...... ...... ...... ......
374          */
375         if (h) {
376                 const char *names[CS_NR] = CS_NAMES;
377
378                 seq_printf(m, "%6s", " ");
379                 for (i = 0; i < CS_NR; i++)
380                         seq_printf(m, "%8s", names[i]);
381                 seq_printf(m, "\n");
382         }
383
384         seq_printf(m, "%5.5s:", cs->cs_name);
385         for (i = 0; i < CS_NR; i++)
386                 seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
387         return 0;
388 }
389
390 static void cl_env_percpu_refill(void);
391
392 /**
393  * Initialize client site.
394  *
395  * Perform common initialization (lu_site_init()), and initialize statistical
396  * counters. Also perform global initializations on the first call.
397  */
398 int cl_site_init(struct cl_site *s, struct cl_device *d)
399 {
400         int i;
401         int result;
402
403         result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
404         if (result == 0) {
405                 cache_stats_init(&s->cs_pages, "pages");
406                 for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
407                         atomic_set(&s->cs_pages_state[0], 0);
408                 cl_env_percpu_refill();
409         }
410         return result;
411 }
412 EXPORT_SYMBOL(cl_site_init);
413
414 /**
415  * Finalize client site. Dual to cl_site_init().
416  */
417 void cl_site_fini(struct cl_site *s)
418 {
419         lu_site_fini(&s->cs_lu);
420 }
421 EXPORT_SYMBOL(cl_site_fini);
422
423 static struct cache_stats cl_env_stats = {
424         .cs_name    = "envs",
425         .cs_stats = { ATOMIC_INIT(0), }
426 };
427
428 /**
429  * Outputs client site statistical counters into a buffer. Suitable for
430  * ll_rd_*()-style functions.
431  */
432 int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
433 {
434         static const char *pstate[] = {
435                 [CPS_CACHED]    = "c",
436                 [CPS_OWNED]     = "o",
437                 [CPS_PAGEOUT]   = "w",
438                 [CPS_PAGEIN]    = "r",
439                 [CPS_FREEING]   = "f"
440         };
441         int i;
442
443 /*
444        lookup    hit  total   busy create
445 pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
446 locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
447   env: ...... ...... ...... ...... ......
448  */
449         lu_site_stats_seq_print(&site->cs_lu, m);
450         cache_stats_print(&site->cs_pages, m, 1);
451         seq_printf(m, " [");
452         for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
453                 seq_printf(m, "%s: %u ", pstate[i],
454                            atomic_read(&site->cs_pages_state[i]));
455         seq_printf(m, "]\n");
456         cache_stats_print(&cl_env_stats, m, 0);
457         seq_printf(m, "\n");
458         return 0;
459 }
460 EXPORT_SYMBOL(cl_site_stats_print);
461
462 /*****************************************************************************
463  *
464  * lu_env handling on client.
465  *
466  */
467
468 /**
469  * The most efficient way is to store cl_env pointer in task specific
470  * structures. On Linux, it wont' be easy to use task_struct->journal_info
471  * because Lustre code may call into other fs which has certain assumptions
472  * about journal_info. Currently following fields in task_struct are identified
473  * can be used for this purpose:
474  *  - cl_env: for liblustre.
475  *  - tux_info: ony on RedHat kernel.
476  *  - ...
477  * \note As long as we use task_struct to store cl_env, we assume that once
478  * called into Lustre, we'll never call into the other part of the kernel
479  * which will use those fields in task_struct without explicitly exiting
480  * Lustre.
481  *
482  * If there's no space in task_struct is available, hash will be used.
483  * bz20044, bz22683.
484  */
485
486 static struct list_head cl_envs;
487 static unsigned cl_envs_cached_nr  = 0;
488 static unsigned cl_envs_cached_max = 128; /* XXX: prototype: arbitrary limit
489                                            * for now. */
490 static DEFINE_SPINLOCK(cl_envs_guard);
491
492 struct cl_env {
493         void             *ce_magic;
494         struct lu_env     ce_lu;
495         struct lu_context ce_ses;
496
497 #ifdef LL_TASK_CL_ENV
498         void             *ce_prev;
499 #else
500         /**
501          * This allows cl_env to be entered into cl_env_hash which implements
502          * the current thread -> client environment lookup.
503          */
504         struct hlist_node  ce_node;
505 #endif
506         /**
507          * Owner for the current cl_env.
508          *
509          * If LL_TASK_CL_ENV is defined, this point to the owning current,
510          * only for debugging purpose ;
511          * Otherwise hash is used, and this is the key for cfs_hash.
512          * Now current thread pid is stored. Note using thread pointer would
513          * lead to unbalanced hash because of its specific allocation locality
514          * and could be varied for different platforms and OSes, even different
515          * OS versions.
516          */
517         void             *ce_owner;
518
519         /*
520          * Linkage into global list of all client environments. Used for
521          * garbage collection.
522          */
523         struct list_head  ce_linkage;
524         /*
525          *
526          */
527         int               ce_ref;
528         /*
529          * Debugging field: address of the caller who made original
530          * allocation.
531          */
532         void             *ce_debug;
533 };
534
535 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
536 #define CL_ENV_INC(counter) atomic_inc(&cl_env_stats.cs_stats[CS_##counter])
537
538 #define CL_ENV_DEC(counter) do {                                              \
539         LASSERT(atomic_read(&cl_env_stats.cs_stats[CS_##counter]) > 0);   \
540         atomic_dec(&cl_env_stats.cs_stats[CS_##counter]);                 \
541 } while (0)
542 #else
543 #define CL_ENV_INC(counter)
544 #define CL_ENV_DEC(counter)
545 #endif
546
547 static void cl_env_init0(struct cl_env *cle, void *debug)
548 {
549         LASSERT(cle->ce_ref == 0);
550         LASSERT(cle->ce_magic == &cl_env_init0);
551         LASSERT(cle->ce_debug == NULL && cle->ce_owner == NULL);
552
553         cle->ce_ref = 1;
554         cle->ce_debug = debug;
555         CL_ENV_INC(busy);
556 }
557
558
559 #ifndef LL_TASK_CL_ENV
560 /*
561  * The implementation of using hash table to connect cl_env and thread
562  */
563
564 static cfs_hash_t *cl_env_hash;
565
566 static unsigned cl_env_hops_hash(cfs_hash_t *lh,
567                                  const void *key, unsigned mask)
568 {
569 #if BITS_PER_LONG == 64
570         return cfs_hash_u64_hash((__u64)key, mask);
571 #else
572         return cfs_hash_u32_hash((__u32)key, mask);
573 #endif
574 }
575
576 static void *cl_env_hops_obj(struct hlist_node *hn)
577 {
578         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
579
580         LASSERT(cle->ce_magic == &cl_env_init0);
581         return (void *)cle;
582 }
583
584 static int cl_env_hops_keycmp(const void *key, struct hlist_node *hn)
585 {
586         struct cl_env *cle = cl_env_hops_obj(hn);
587
588         LASSERT(cle->ce_owner != NULL);
589         return (key == cle->ce_owner);
590 }
591
592 static void cl_env_hops_noop(cfs_hash_t *hs, struct hlist_node *hn)
593 {
594         struct cl_env *cle = hlist_entry(hn, struct cl_env, ce_node);
595         LASSERT(cle->ce_magic == &cl_env_init0);
596 }
597
598 static cfs_hash_ops_t cl_env_hops = {
599         .hs_hash        = cl_env_hops_hash,
600         .hs_key         = cl_env_hops_obj,
601         .hs_keycmp      = cl_env_hops_keycmp,
602         .hs_object      = cl_env_hops_obj,
603         .hs_get         = cl_env_hops_noop,
604         .hs_put_locked  = cl_env_hops_noop,
605 };
606
607 static inline struct cl_env *cl_env_fetch(void)
608 {
609         struct cl_env *cle;
610
611         cle = cfs_hash_lookup(cl_env_hash, (void *) (long) current->pid);
612         LASSERT(ergo(cle, cle->ce_magic == &cl_env_init0));
613         return cle;
614 }
615
616 static inline void cl_env_attach(struct cl_env *cle)
617 {
618         if (cle) {
619                 int rc;
620
621                 LASSERT(cle->ce_owner == NULL);
622                 cle->ce_owner = (void *) (long) current->pid;
623                 rc = cfs_hash_add_unique(cl_env_hash, cle->ce_owner,
624                                          &cle->ce_node);
625                 LASSERT(rc == 0);
626         }
627 }
628
629 static inline void cl_env_do_detach(struct cl_env *cle)
630 {
631         void *cookie;
632
633         LASSERT(cle->ce_owner == (void *) (long) current->pid);
634         cookie = cfs_hash_del(cl_env_hash, cle->ce_owner,
635                               &cle->ce_node);
636         LASSERT(cookie == cle);
637         cle->ce_owner = NULL;
638 }
639
640 static int cl_env_store_init(void) {
641         cl_env_hash = cfs_hash_create("cl_env",
642                                       HASH_CL_ENV_BITS, HASH_CL_ENV_BITS,
643                                       HASH_CL_ENV_BKT_BITS, 0,
644                                       CFS_HASH_MIN_THETA,
645                                       CFS_HASH_MAX_THETA,
646                                       &cl_env_hops,
647                                       CFS_HASH_RW_BKTLOCK);
648         return cl_env_hash != NULL ? 0 :-ENOMEM;
649 }
650
651 static void cl_env_store_fini(void) {
652         cfs_hash_putref(cl_env_hash);
653 }
654
655 #else /* LL_TASK_CL_ENV */
656 /*
657  * The implementation of store cl_env directly in thread structure.
658  */
659
660 static inline struct cl_env *cl_env_fetch(void)
661 {
662         struct cl_env *cle;
663
664         cle = current->LL_TASK_CL_ENV;
665         if (cle && cle->ce_magic != &cl_env_init0)
666                 cle = NULL;
667         return cle;
668 }
669
670 static inline void cl_env_attach(struct cl_env *cle)
671 {
672         if (cle) {
673                 LASSERT(cle->ce_owner == NULL);
674                 cle->ce_owner = current;
675                 cle->ce_prev = current->LL_TASK_CL_ENV;
676                 current->LL_TASK_CL_ENV = cle;
677         }
678 }
679
680 static inline void cl_env_do_detach(struct cl_env *cle)
681 {
682         LASSERT(cle->ce_owner == current);
683         LASSERT(current->LL_TASK_CL_ENV == cle);
684         current->LL_TASK_CL_ENV = cle->ce_prev;
685         cle->ce_owner = NULL;
686 }
687
688 static int cl_env_store_init(void) { return 0; }
689 static void cl_env_store_fini(void) { }
690
691 #endif /* LL_TASK_CL_ENV */
692
693 static inline struct cl_env *cl_env_detach(struct cl_env *cle)
694 {
695         if (cle == NULL)
696                 cle = cl_env_fetch();
697
698         if (cle && cle->ce_owner)
699                 cl_env_do_detach(cle);
700
701         return cle;
702 }
703
704 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
705 {
706         struct lu_env *env;
707         struct cl_env *cle;
708
709         OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, GFP_NOFS);
710         if (cle != NULL) {
711                 int rc;
712
713                 INIT_LIST_HEAD(&cle->ce_linkage);
714                 cle->ce_magic = &cl_env_init0;
715                 env = &cle->ce_lu;
716                 rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
717                 if (rc == 0) {
718                         rc = lu_context_init(&cle->ce_ses,
719                                              LCT_SESSION | ses_tags);
720                         if (rc == 0) {
721                                 lu_context_enter(&cle->ce_ses);
722                                 env->le_ses = &cle->ce_ses;
723                                 cl_env_init0(cle, debug);
724                         } else
725                                 lu_env_fini(env);
726                 }
727                 if (rc != 0) {
728                         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
729                         env = ERR_PTR(rc);
730                 } else {
731                         CL_ENV_INC(create);
732                         CL_ENV_INC(total);
733                 }
734         } else
735                 env = ERR_PTR(-ENOMEM);
736         return env;
737 }
738
739 static void cl_env_fini(struct cl_env *cle)
740 {
741         CL_ENV_DEC(total);
742         lu_context_fini(&cle->ce_lu.le_ctx);
743         lu_context_fini(&cle->ce_ses);
744         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
745 }
746
747 static struct lu_env *cl_env_obtain(void *debug)
748 {
749         struct cl_env *cle;
750         struct lu_env *env;
751
752         ENTRY;
753         spin_lock(&cl_envs_guard);
754         LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
755         if (cl_envs_cached_nr > 0) {
756                 int rc;
757
758                 cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
759                 list_del_init(&cle->ce_linkage);
760                 cl_envs_cached_nr--;
761                 spin_unlock(&cl_envs_guard);
762
763                 env = &cle->ce_lu;
764                 rc = lu_env_refill(env);
765                 if (rc == 0) {
766                         cl_env_init0(cle, debug);
767                         lu_context_enter(&env->le_ctx);
768                         lu_context_enter(&cle->ce_ses);
769                 } else {
770                         cl_env_fini(cle);
771                         env = ERR_PTR(rc);
772                 }
773         } else {
774                 spin_unlock(&cl_envs_guard);
775                 env = cl_env_new(lu_context_tags_default,
776                                  lu_session_tags_default, debug);
777         }
778         RETURN(env);
779 }
780
781 static inline struct cl_env *cl_env_container(struct lu_env *env)
782 {
783         return container_of(env, struct cl_env, ce_lu);
784 }
785
786 struct lu_env *cl_env_peek(int *refcheck)
787 {
788         struct lu_env *env;
789         struct cl_env *cle;
790
791         CL_ENV_INC(lookup);
792
793         /* check that we don't go far from untrusted pointer */
794         CLASSERT(offsetof(struct cl_env, ce_magic) == 0);
795
796         env = NULL;
797         cle = cl_env_fetch();
798         if (cle != NULL) {
799                 CL_ENV_INC(hit);
800                 env = &cle->ce_lu;
801                 *refcheck = ++cle->ce_ref;
802         }
803         CDEBUG(D_OTHER, "%d@%p\n", cle ? cle->ce_ref : 0, cle);
804         return env;
805 }
806 EXPORT_SYMBOL(cl_env_peek);
807
808 /**
809  * Returns lu_env: if there already is an environment associated with the
810  * current thread, it is returned, otherwise, new environment is allocated.
811  *
812  * Allocations are amortized through the global cache of environments.
813  *
814  * \param refcheck pointer to a counter used to detect environment leaks. In
815  * the usual case cl_env_get() and cl_env_put() are called in the same lexical
816  * scope and pointer to the same integer is passed as \a refcheck. This is
817  * used to detect missed cl_env_put().
818  *
819  * \see cl_env_put()
820  */
821 struct lu_env *cl_env_get(int *refcheck)
822 {
823         struct lu_env *env;
824
825         env = cl_env_peek(refcheck);
826         if (env == NULL) {
827                 env = cl_env_obtain(__builtin_return_address(0));
828                 if (!IS_ERR(env)) {
829                         struct cl_env *cle;
830
831                         cle = cl_env_container(env);
832                         cl_env_attach(cle);
833                         *refcheck = cle->ce_ref;
834                         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
835                 }
836         }
837         return env;
838 }
839 EXPORT_SYMBOL(cl_env_get);
840
841 /**
842  * Forces an allocation of a fresh environment with given tags.
843  *
844  * \see cl_env_get()
845  */
846 struct lu_env *cl_env_alloc(int *refcheck, __u32 tags)
847 {
848         struct lu_env *env;
849
850         LASSERT(cl_env_peek(refcheck) == NULL);
851         env = cl_env_new(tags, tags, __builtin_return_address(0));
852         if (!IS_ERR(env)) {
853                 struct cl_env *cle;
854
855                 cle = cl_env_container(env);
856                 *refcheck = cle->ce_ref;
857                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
858         }
859         return env;
860 }
861 EXPORT_SYMBOL(cl_env_alloc);
862
863 static void cl_env_exit(struct cl_env *cle)
864 {
865         LASSERT(cle->ce_owner == NULL);
866         lu_context_exit(&cle->ce_lu.le_ctx);
867         lu_context_exit(&cle->ce_ses);
868 }
869
870 /**
871  * Finalizes and frees a given number of cached environments. This is done to
872  * (1) free some memory (not currently hooked into VM), or (2) release
873  * references to modules.
874  */
875 unsigned cl_env_cache_purge(unsigned nr)
876 {
877         struct cl_env *cle;
878
879         ENTRY;
880         spin_lock(&cl_envs_guard);
881         for (; !list_empty(&cl_envs) && nr > 0; --nr) {
882                 cle = container_of(cl_envs.next, struct cl_env, ce_linkage);
883                 list_del_init(&cle->ce_linkage);
884                 LASSERT(cl_envs_cached_nr > 0);
885                 cl_envs_cached_nr--;
886                 spin_unlock(&cl_envs_guard);
887
888                 cl_env_fini(cle);
889                 spin_lock(&cl_envs_guard);
890         }
891         LASSERT(equi(cl_envs_cached_nr == 0, list_empty(&cl_envs)));
892         spin_unlock(&cl_envs_guard);
893         RETURN(nr);
894 }
895 EXPORT_SYMBOL(cl_env_cache_purge);
896
897 /**
898  * Release an environment.
899  *
900  * Decrement \a env reference counter. When counter drops to 0, nothing in
901  * this thread is using environment and it is returned to the allocation
902  * cache, or freed straight away, if cache is large enough.
903  */
904 void cl_env_put(struct lu_env *env, int *refcheck)
905 {
906         struct cl_env *cle;
907
908         cle = cl_env_container(env);
909
910         LASSERT(cle->ce_ref > 0);
911         LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
912
913         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
914         if (--cle->ce_ref == 0) {
915                 CL_ENV_DEC(busy);
916                 cl_env_detach(cle);
917                 cle->ce_debug = NULL;
918                 cl_env_exit(cle);
919                 /*
920                  * Don't bother to take a lock here.
921                  *
922                  * Return environment to the cache only when it was allocated
923                  * with the standard tags.
924                  */
925                 if (cl_envs_cached_nr < cl_envs_cached_max &&
926                     (env->le_ctx.lc_tags & ~LCT_HAS_EXIT) == LCT_CL_THREAD &&
927                     (env->le_ses->lc_tags & ~LCT_HAS_EXIT) == LCT_SESSION) {
928                         spin_lock(&cl_envs_guard);
929                         list_add(&cle->ce_linkage, &cl_envs);
930                         cl_envs_cached_nr++;
931                         spin_unlock(&cl_envs_guard);
932                 } else
933                         cl_env_fini(cle);
934         }
935 }
936 EXPORT_SYMBOL(cl_env_put);
937
938 /**
939  * Declares a point of re-entrancy.
940  *
941  * \see cl_env_reexit()
942  */
943 void *cl_env_reenter(void)
944 {
945         return cl_env_detach(NULL);
946 }
947 EXPORT_SYMBOL(cl_env_reenter);
948
949 /**
950  * Exits re-entrancy.
951  */
952 void cl_env_reexit(void *cookie)
953 {
954         cl_env_detach(NULL);
955         cl_env_attach(cookie);
956 }
957 EXPORT_SYMBOL(cl_env_reexit);
958
959 /**
960  * Setup user-supplied \a env as a current environment. This is to be used to
961  * guaranteed that environment exists even when cl_env_get() fails. It is up
962  * to user to ensure proper concurrency control.
963  *
964  * \see cl_env_unplant()
965  */
966 void cl_env_implant(struct lu_env *env, int *refcheck)
967 {
968         struct cl_env *cle = cl_env_container(env);
969
970         LASSERT(cle->ce_ref > 0);
971
972         cl_env_attach(cle);
973         cl_env_get(refcheck);
974         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
975 }
976 EXPORT_SYMBOL(cl_env_implant);
977
978 /**
979  * Detach environment installed earlier by cl_env_implant().
980  */
981 void cl_env_unplant(struct lu_env *env, int *refcheck)
982 {
983         struct cl_env *cle = cl_env_container(env);
984
985         LASSERT(cle->ce_ref > 1);
986
987         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
988
989         cl_env_detach(cle);
990         cl_env_put(env, refcheck);
991 }
992 EXPORT_SYMBOL(cl_env_unplant);
993
994 struct lu_env *cl_env_nested_get(struct cl_env_nest *nest)
995 {
996         struct lu_env *env;
997
998         nest->cen_cookie = NULL;
999         env = cl_env_peek(&nest->cen_refcheck);
1000         if (env != NULL) {
1001                 if (!cl_io_is_going(env))
1002                         return env;
1003                 else {
1004                         cl_env_put(env, &nest->cen_refcheck);
1005                         nest->cen_cookie = cl_env_reenter();
1006                 }
1007         }
1008         env = cl_env_get(&nest->cen_refcheck);
1009         if (IS_ERR(env)) {
1010                 cl_env_reexit(nest->cen_cookie);
1011                 return env;
1012         }
1013
1014         LASSERT(!cl_io_is_going(env));
1015         return env;
1016 }
1017 EXPORT_SYMBOL(cl_env_nested_get);
1018
1019 void cl_env_nested_put(struct cl_env_nest *nest, struct lu_env *env)
1020 {
1021         cl_env_put(env, &nest->cen_refcheck);
1022         cl_env_reexit(nest->cen_cookie);
1023 }
1024 EXPORT_SYMBOL(cl_env_nested_put);
1025
1026 /**
1027  * Converts struct cl_attr to struct ost_lvb.
1028  *
1029  * \see cl_lvb2attr
1030  */
1031 void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
1032 {
1033         ENTRY;
1034         lvb->lvb_size   = attr->cat_size;
1035         lvb->lvb_mtime  = attr->cat_mtime;
1036         lvb->lvb_atime  = attr->cat_atime;
1037         lvb->lvb_ctime  = attr->cat_ctime;
1038         lvb->lvb_blocks = attr->cat_blocks;
1039         EXIT;
1040 }
1041 EXPORT_SYMBOL(cl_attr2lvb);
1042
1043 /**
1044  * Converts struct ost_lvb to struct cl_attr.
1045  *
1046  * \see cl_attr2lvb
1047  */
1048 void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
1049 {
1050         ENTRY;
1051         attr->cat_size   = lvb->lvb_size;
1052         attr->cat_mtime  = lvb->lvb_mtime;
1053         attr->cat_atime  = lvb->lvb_atime;
1054         attr->cat_ctime  = lvb->lvb_ctime;
1055         attr->cat_blocks = lvb->lvb_blocks;
1056         EXIT;
1057 }
1058 EXPORT_SYMBOL(cl_lvb2attr);
1059
1060 static struct cl_env cl_env_percpu[NR_CPUS];
1061
1062 static int cl_env_percpu_init(void)
1063 {
1064         struct cl_env *cle;
1065         int tags = LCT_REMEMBER | LCT_NOREF;
1066         int i, j;
1067         int rc = 0;
1068
1069         for_each_possible_cpu(i) {
1070                 struct lu_env *env;
1071
1072                 cle = &cl_env_percpu[i];
1073                 env = &cle->ce_lu;
1074
1075                 INIT_LIST_HEAD(&cle->ce_linkage);
1076                 cle->ce_magic = &cl_env_init0;
1077                 rc = lu_env_init(env, LCT_CL_THREAD | tags);
1078                 if (rc == 0) {
1079                         rc = lu_context_init(&cle->ce_ses, LCT_SESSION | tags);
1080                         if (rc == 0) {
1081                                 lu_context_enter(&cle->ce_ses);
1082                                 env->le_ses = &cle->ce_ses;
1083                         } else {
1084                                 lu_env_fini(env);
1085                         }
1086                 }
1087                 if (rc != 0)
1088                         break;
1089         }
1090         if (rc != 0) {
1091                 /* Indices 0 to i (excluding i) were correctly initialized,
1092                  * thus we must uninitialize up to i, the rest are undefined. */
1093                 for (j = 0; j < i; j++) {
1094                         cle = &cl_env_percpu[i];
1095                         lu_context_exit(&cle->ce_ses);
1096                         lu_context_fini(&cle->ce_ses);
1097                         lu_env_fini(&cle->ce_lu);
1098                 }
1099         }
1100
1101         return rc;
1102 }
1103
1104 static void cl_env_percpu_fini(void)
1105 {
1106         int i;
1107
1108         for_each_possible_cpu(i) {
1109                 struct cl_env *cle = &cl_env_percpu[i];
1110
1111                 lu_context_exit(&cle->ce_ses);
1112                 lu_context_fini(&cle->ce_ses);
1113                 lu_env_fini(&cle->ce_lu);
1114         }
1115 }
1116
1117 static void cl_env_percpu_refill(void)
1118 {
1119         int i;
1120
1121         for_each_possible_cpu(i)
1122                 lu_env_refill(&cl_env_percpu[i].ce_lu);
1123 }
1124
1125 void cl_env_percpu_put(struct lu_env *env)
1126 {
1127         struct cl_env *cle;
1128         int cpu;
1129
1130         cpu = smp_processor_id();
1131         cle = cl_env_container(env);
1132         LASSERT(cle == &cl_env_percpu[cpu]);
1133
1134         cle->ce_ref--;
1135         LASSERT(cle->ce_ref == 0);
1136
1137         CL_ENV_DEC(busy);
1138         cl_env_detach(cle);
1139         cle->ce_debug = NULL;
1140
1141         put_cpu();
1142 }
1143 EXPORT_SYMBOL(cl_env_percpu_put);
1144
1145 struct lu_env *cl_env_percpu_get()
1146 {
1147         struct cl_env *cle;
1148
1149         cle = &cl_env_percpu[get_cpu()];
1150         cl_env_init0(cle, __builtin_return_address(0));
1151
1152         cl_env_attach(cle);
1153         return &cle->ce_lu;
1154 }
1155 EXPORT_SYMBOL(cl_env_percpu_get);
1156
1157 /*****************************************************************************
1158  *
1159  * Temporary prototype thing: mirror obd-devices into cl devices.
1160  *
1161  */
1162
1163 struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
1164                                 struct lu_device_type *ldt,
1165                                 struct lu_device *next)
1166 {
1167         const char       *typename;
1168         struct lu_device *d;
1169
1170         LASSERT(ldt != NULL);
1171
1172         typename = ldt->ldt_name;
1173         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
1174         if (!IS_ERR(d)) {
1175                 int rc;
1176
1177                 if (site != NULL)
1178                         d->ld_site = site;
1179                 rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
1180                 if (rc == 0) {
1181                         lu_device_get(d);
1182                         lu_ref_add(&d->ld_reference,
1183                                    "lu-stack", &lu_site_init);
1184                 } else {
1185                         ldt->ldt_ops->ldto_device_free(env, d);
1186                         CERROR("can't init device '%s', %d\n", typename, rc);
1187                         d = ERR_PTR(rc);
1188                 }
1189         } else
1190                 CERROR("Cannot allocate device: '%s'\n", typename);
1191         return lu2cl_dev(d);
1192 }
1193 EXPORT_SYMBOL(cl_type_setup);
1194
1195 /**
1196  * Finalize device stack by calling lu_stack_fini().
1197  */
1198 void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1199 {
1200         lu_stack_fini(env, cl2lu_dev(cl));
1201 }
1202 EXPORT_SYMBOL(cl_stack_fini);
1203
1204 static struct lu_context_key cl_key;
1205
1206 struct cl_thread_info *cl_env_info(const struct lu_env *env)
1207 {
1208         return lu_context_key_get(&env->le_ctx, &cl_key);
1209 }
1210
1211 /* defines cl0_key_{init,fini}() */
1212 LU_KEY_INIT_FINI(cl0, struct cl_thread_info);
1213
1214 static void *cl_key_init(const struct lu_context *ctx,
1215                          struct lu_context_key *key)
1216 {
1217         struct cl_thread_info *info;
1218
1219         info = cl0_key_init(ctx, key);
1220         if (!IS_ERR(info)) {
1221                 int i;
1222
1223                 for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1224                         lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1225         }
1226         return info;
1227 }
1228
1229 static void cl_key_fini(const struct lu_context *ctx,
1230                         struct lu_context_key *key, void *data)
1231 {
1232         struct cl_thread_info *info;
1233         int i;
1234
1235         info = data;
1236         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i)
1237                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1238         cl0_key_fini(ctx, key, data);
1239 }
1240
1241 static void cl_key_exit(const struct lu_context *ctx,
1242                         struct lu_context_key *key, void *data)
1243 {
1244         struct cl_thread_info *info = data;
1245         int i;
1246
1247         for (i = 0; i < ARRAY_SIZE(info->clt_counters); ++i) {
1248                 LASSERT(info->clt_counters[i].ctc_nr_held == 0);
1249                 LASSERT(info->clt_counters[i].ctc_nr_used == 0);
1250                 LASSERT(info->clt_counters[i].ctc_nr_locks_acquired == 0);
1251                 LASSERT(info->clt_counters[i].ctc_nr_locks_locked == 0);
1252                 lu_ref_fini(&info->clt_counters[i].ctc_locks_locked);
1253                 lu_ref_init(&info->clt_counters[i].ctc_locks_locked);
1254         }
1255 }
1256
1257 static struct lu_context_key cl_key = {
1258         .lct_tags = LCT_CL_THREAD,
1259         .lct_init = cl_key_init,
1260         .lct_fini = cl_key_fini,
1261         .lct_exit = cl_key_exit
1262 };
1263
1264 static struct lu_kmem_descr cl_object_caches[] = {
1265         {
1266                 .ckd_cache = &cl_env_kmem,
1267                 .ckd_name  = "cl_env_kmem",
1268                 .ckd_size  = sizeof (struct cl_env)
1269         },
1270         {
1271                 .ckd_cache = NULL
1272         }
1273 };
1274
1275 /**
1276  * Global initialization of cl-data. Create kmem caches, register
1277  * lu_context_key's, etc.
1278  *
1279  * \see cl_global_fini()
1280  */
1281 int cl_global_init(void)
1282 {
1283         int result;
1284
1285         INIT_LIST_HEAD(&cl_envs);
1286
1287         result = cl_env_store_init();
1288         if (result)
1289                 return result;
1290
1291         result = lu_kmem_init(cl_object_caches);
1292         if (result)
1293                 goto out_store;
1294
1295         LU_CONTEXT_KEY_INIT(&cl_key);
1296         result = lu_context_key_register(&cl_key);
1297         if (result)
1298                 goto out_kmem;
1299
1300         result = cl_env_percpu_init();
1301         if (result)
1302                 /* no cl_env_percpu_fini on error */
1303                 goto out_context;
1304
1305         return 0;
1306
1307 out_context:
1308         lu_context_key_degister(&cl_key);
1309 out_kmem:
1310         lu_kmem_fini(cl_object_caches);
1311 out_store:
1312         cl_env_store_fini();
1313         return result;
1314 }
1315
1316 /**
1317  * Finalization of global cl-data. Dual to cl_global_init().
1318  */
1319 void cl_global_fini(void)
1320 {
1321         cl_env_percpu_fini();
1322         lu_context_key_degister(&cl_key);
1323         lu_kmem_fini(cl_object_caches);
1324         cl_env_store_fini();
1325 }