Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / obdclass / cl_object.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * Client Lustre Object.
32  *
33  *   Author: Nikita Danilov <nikita.danilov@sun.com>
34  *   Author: Jinshan Xiong <jinshan.xiong@intel.com>
35  */
36
37 /*
38  * Locking.
39  *
40  *  i_mutex
41  *      PG_locked
42  *          ->coh_attr_guard
43  *          ->ls_guard
44  */
45
46 #define DEBUG_SUBSYSTEM S_CLASS
47
48 #include <linux/list.h>
49 #include <libcfs/libcfs.h>
50 #include <obd_class.h>
51 #include <obd_support.h>
52 #include <lustre_fid.h>
53 #include <cl_object.h>
54 #include <lu_object.h>
55 #include "cl_internal.h"
56
57 static struct kmem_cache *cl_env_kmem;
58 struct kmem_cache *cl_dio_aio_kmem;
59 struct kmem_cache *cl_sub_dio_kmem;
60 struct kmem_cache *cl_page_kmem_array[16];
61 unsigned short cl_page_kmem_size_array[16];
62
63 /** Lock class of cl_object_header::coh_attr_guard */
64 static struct lock_class_key cl_attr_guard_class;
65
66 /**
67  * Initialize cl_object_header.
68  */
69 int cl_object_header_init(struct cl_object_header *h)
70 {
71         int result;
72
73         ENTRY;
74         result = lu_object_header_init(&h->coh_lu);
75         if (result == 0) {
76                 spin_lock_init(&h->coh_attr_guard);
77                 lockdep_set_class(&h->coh_attr_guard, &cl_attr_guard_class);
78                 h->coh_page_bufsize = 0;
79         }
80         RETURN(result);
81 }
82 EXPORT_SYMBOL(cl_object_header_init);
83
84 /**
85  * Finalize cl_object_header.
86  */
87 void cl_object_header_fini(struct cl_object_header *h)
88 {
89         lu_object_header_fini(&h->coh_lu);
90 }
91
92 /**
93  * Returns a cl_object with a given \a fid.
94  *
95  * Returns either cached or newly created object. Additional reference on the
96  * returned object is acquired.
97  *
98  * \see lu_object_find(), cl_page_find(), cl_lock_find()
99  */
100 struct cl_object *cl_object_find(const struct lu_env *env,
101                                  struct cl_device *cd, const struct lu_fid *fid,
102                                  const struct cl_object_conf *c)
103 {
104         might_sleep();
105         return lu2cl(lu_object_find_slice(env, cl2lu_dev(cd), fid, &c->coc_lu));
106 }
107 EXPORT_SYMBOL(cl_object_find);
108
109 /**
110  * Releases a reference on \a o.
111  *
112  * When last reference is released object is returned to the cache, unless
113  * lu_object_header_flags::LU_OBJECT_HEARD_BANSHEE bit is set in its header.
114  *
115  * \see cl_page_put(), cl_lock_put().
116  */
117 void cl_object_put(const struct lu_env *env, struct cl_object *o)
118 {
119         lu_object_put(env, &o->co_lu);
120 }
121 EXPORT_SYMBOL(cl_object_put);
122
123 /**
124  * Acquire an additional reference to the object \a o.
125  *
126  * This can only be used to acquire _additional_ reference, i.e., caller
127  * already has to possess at least one reference to \a o before calling this.
128  *
129  * \see cl_page_get(), cl_lock_get().
130  */
131 void cl_object_get(struct cl_object *o)
132 {
133         lu_object_get(&o->co_lu);
134 }
135 EXPORT_SYMBOL(cl_object_get);
136
137 /**
138  * Returns the top-object for a given \a o.
139  *
140  * \see cl_io_top()
141  */
142 struct cl_object *cl_object_top(struct cl_object *o)
143 {
144         struct cl_object_header *hdr = cl_object_header(o);
145         struct cl_object *top;
146
147         while (hdr->coh_parent != NULL)
148                 hdr = hdr->coh_parent;
149
150         top = lu2cl(lu_object_top(&hdr->coh_lu));
151         CDEBUG(D_TRACE, "%p -> %p\n", o, top);
152         return top;
153 }
154 EXPORT_SYMBOL(cl_object_top);
155
156 /**
157  * Returns pointer to the lock protecting data-attributes for the given object
158  * \a o.
159  *
160  * Data-attributes are protected by the cl_object_header::coh_attr_guard
161  * spin-lock in the top-object.
162  *
163  * \see cl_attr, cl_object_attr_lock(), cl_object_operations::coo_attr_get().
164  */
165 static spinlock_t *cl_object_attr_guard(struct cl_object *o)
166 {
167         return &cl_object_header(cl_object_top(o))->coh_attr_guard;
168 }
169
170 /**
171  * Locks data-attributes.
172  *
173  * Prevents data-attributes from changing, until lock is released by
174  * cl_object_attr_unlock(). This has to be called before calls to
175  * cl_object_attr_get(), cl_object_attr_update().
176  */
177 void cl_object_attr_lock(struct cl_object *o)
178 __acquires(cl_object_attr_guard(o))
179 {
180         spin_lock(cl_object_attr_guard(o));
181 }
182 EXPORT_SYMBOL(cl_object_attr_lock);
183
184 /**
185  * Releases data-attributes lock, acquired by cl_object_attr_lock().
186  */
187 void cl_object_attr_unlock(struct cl_object *o)
188 __releases(cl_object_attr_guard(o))
189 {
190         spin_unlock(cl_object_attr_guard(o));
191 }
192 EXPORT_SYMBOL(cl_object_attr_unlock);
193
194 /**
195  * Returns data-attributes of an object \a obj.
196  *
197  * Every layer is asked (by calling cl_object_operations::coo_attr_get())
198  * top-to-bottom to fill in parts of \a attr that this layer is responsible
199  * for.
200  */
201 int cl_object_attr_get(const struct lu_env *env, struct cl_object *top,
202                         struct cl_attr *attr)
203 {
204         struct cl_object *obj;
205         int result = 0;
206
207         assert_spin_locked(cl_object_attr_guard(top));
208         ENTRY;
209
210         cl_object_for_each(obj, top) {
211                 if (obj->co_ops->coo_attr_get != NULL) {
212                         result = obj->co_ops->coo_attr_get(env, obj, attr);
213                         if (result != 0) {
214                                 if (result > 0)
215                                         result = 0;
216                                 break;
217                         }
218                 }
219         }
220         RETURN(result);
221 }
222 EXPORT_SYMBOL(cl_object_attr_get);
223
224 /**
225  * Updates data-attributes of an object \a obj.
226  *
227  * Only attributes, mentioned in a validness bit-mask \a v are
228  * updated. Calls cl_object_operations::coo_upd_attr() on every layer, bottom
229  * to top.
230  */
231 int cl_object_attr_update(const struct lu_env *env, struct cl_object *top,
232                           const struct cl_attr *attr, unsigned v)
233 {
234         struct cl_object *obj;
235         int result = 0;
236
237         assert_spin_locked(cl_object_attr_guard(top));
238         ENTRY;
239
240         cl_object_for_each_reverse(obj, top) {
241                 if (obj->co_ops->coo_attr_update != NULL) {
242                         result = obj->co_ops->coo_attr_update(env, obj, attr,
243                                                               v);
244                         if (result != 0) {
245                                 if (result > 0)
246                                         result = 0;
247                                 break;
248                         }
249                 }
250         }
251         RETURN(result);
252 }
253 EXPORT_SYMBOL(cl_object_attr_update);
254
255 /**
256  * Mark the inode as dirty when the inode has uncommitted (unstable) pages.
257  * Thus when the system is under momory pressure, it will trigger writeback
258  * on background to commit and unpin the pages.
259  */
260 void cl_object_dirty_for_sync(const struct lu_env *env, struct cl_object *top)
261 {
262         struct cl_object *obj;
263
264         ENTRY;
265
266         cl_object_for_each(obj, top) {
267                 if (obj->co_ops->coo_dirty_for_sync != NULL)
268                         obj->co_ops->coo_dirty_for_sync(env, obj);
269         }
270         EXIT;
271 }
272 EXPORT_SYMBOL(cl_object_dirty_for_sync);
273
274 /**
275  * Notifies layers (bottom-to-top) that glimpse AST was received.
276  *
277  * Layers have to fill \a lvb fields with information that will be shipped
278  * back to glimpse issuer.
279  *
280  * \see cl_lock_operations::clo_glimpse()
281  */
282 int cl_object_glimpse(const struct lu_env *env, struct cl_object *top,
283                       struct ost_lvb *lvb)
284 {
285         struct cl_object *obj;
286         int result = 0;
287
288         ENTRY;
289         cl_object_for_each_reverse(obj, top) {
290                 if (obj->co_ops->coo_glimpse != NULL) {
291                         result = obj->co_ops->coo_glimpse(env, obj, lvb);
292                         if (result != 0)
293                                 break;
294                 }
295         }
296         LU_OBJECT_HEADER(D_DLMTRACE, env, lu_object_top(top->co_lu.lo_header),
297                          "size: %llu mtime: %llu atime: %llu "
298                          "ctime: %llu blocks: %llu\n",
299                          lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime,
300                          lvb->lvb_ctime, lvb->lvb_blocks);
301         RETURN(result);
302 }
303 EXPORT_SYMBOL(cl_object_glimpse);
304
305 /**
306  * Updates a configuration of an object \a obj.
307  */
308 int cl_conf_set(const struct lu_env *env, struct cl_object *top,
309                 const struct cl_object_conf *conf)
310 {
311         struct cl_object *obj;
312         int result = 0;
313
314         ENTRY;
315         cl_object_for_each(obj, top) {
316                 if (obj->co_ops->coo_conf_set != NULL) {
317                         result = obj->co_ops->coo_conf_set(env, obj, conf);
318                         if (result)
319                                 break;
320                 }
321         }
322         RETURN(result);
323 }
324 EXPORT_SYMBOL(cl_conf_set);
325
326 /**
327  * Prunes caches of pages and locks for this object.
328  */
329 int cl_object_prune(const struct lu_env *env, struct cl_object *top)
330 {
331         struct cl_object *obj;
332         int result = 0;
333         ENTRY;
334
335         cl_object_for_each(obj, top) {
336                 if (obj->co_ops->coo_prune != NULL) {
337                         result = obj->co_ops->coo_prune(env, obj);
338                         if (result)
339                                 break;
340                 }
341         }
342
343         RETURN(result);
344 }
345 EXPORT_SYMBOL(cl_object_prune);
346
347 /**
348  * Get stripe information of this object.
349  */
350 int cl_object_getstripe(const struct lu_env *env, struct cl_object *top,
351                         struct lov_user_md __user *uarg, size_t size)
352 {
353         struct cl_object *obj;
354         int result = 0;
355         ENTRY;
356
357         cl_object_for_each(obj, top) {
358                 if (obj->co_ops->coo_getstripe) {
359                         result = obj->co_ops->coo_getstripe(env, obj, uarg,
360                                                             size);
361                         if (result)
362                                 break;
363                 }
364         }
365         RETURN(result);
366 }
367 EXPORT_SYMBOL(cl_object_getstripe);
368
369 /**
370  * Get fiemap extents from file object.
371  *
372  * \param env [in]      lustre environment
373  * \param obj [in]      file object
374  * \param key [in]      fiemap request argument
375  * \param fiemap [out]  fiemap extents mapping retrived
376  * \param buflen [in]   max buffer length of @fiemap
377  *
378  * \retval 0    success
379  * \retval < 0  error
380  */
381 int cl_object_fiemap(const struct lu_env *env, struct cl_object *top,
382                      struct ll_fiemap_info_key *key,
383                      struct fiemap *fiemap, size_t *buflen)
384 {
385         struct cl_object *obj;
386         int result = 0;
387         ENTRY;
388
389         cl_object_for_each(obj, top) {
390                 if (obj->co_ops->coo_fiemap) {
391                         result = obj->co_ops->coo_fiemap(env, obj, key, fiemap,
392                                                          buflen);
393                         if (result)
394                                 break;
395                 }
396         }
397         RETURN(result);
398 }
399 EXPORT_SYMBOL(cl_object_fiemap);
400
401 int cl_object_layout_get(const struct lu_env *env, struct cl_object *top,
402                          struct cl_layout *cl)
403 {
404         struct cl_object *obj;
405         ENTRY;
406
407         cl_object_for_each(obj, top) {
408                 if (obj->co_ops->coo_layout_get)
409                         return obj->co_ops->coo_layout_get(env, obj, cl);
410         }
411
412         RETURN(-EOPNOTSUPP);
413 }
414 EXPORT_SYMBOL(cl_object_layout_get);
415
416 loff_t cl_object_maxbytes(struct cl_object *top)
417 {
418         struct cl_object *obj;
419         loff_t maxbytes = LLONG_MAX;
420         ENTRY;
421
422         cl_object_for_each(obj, top) {
423                 if (obj->co_ops->coo_maxbytes)
424                         maxbytes = min_t(loff_t, obj->co_ops->coo_maxbytes(obj),
425                                          maxbytes);
426         }
427
428         RETURN(maxbytes);
429 }
430 EXPORT_SYMBOL(cl_object_maxbytes);
431
432 int cl_object_flush(const struct lu_env *env, struct cl_object *top,
433                          struct ldlm_lock *lock)
434 {
435         struct cl_object *obj;
436         int rc = 0;
437         ENTRY;
438
439         cl_object_for_each(obj, top) {
440                 if (obj->co_ops->coo_object_flush) {
441                         rc = obj->co_ops->coo_object_flush(env, obj, lock);
442                         if (rc)
443                                 break;
444                 }
445         }
446         RETURN(rc);
447 }
448 EXPORT_SYMBOL(cl_object_flush);
449
450 int cl_object_inode_ops(const struct lu_env *env, struct cl_object *top,
451                         enum coo_inode_opc opc, void *data)
452 {
453         struct cl_object *obj;
454         int rc = 0;
455
456         ENTRY;
457
458         cl_object_for_each(obj, top) {
459                 if (obj->co_ops->coo_inode_ops) {
460                         rc = obj->co_ops->coo_inode_ops(env, obj, opc, data);
461                         if (rc)
462                                 break;
463                 }
464         }
465         RETURN(rc);
466 }
467 EXPORT_SYMBOL(cl_object_inode_ops);
468
469 /**
470  * Helper function removing all object locks, and marking object for
471  * deletion. All object pages must have been deleted at this point.
472  *
473  * This is called by cl_inode_fini() and lov_object_delete() to destroy top-
474  * and sub- objects respectively.
475  */
476 void cl_object_kill(const struct lu_env *env, struct cl_object *obj)
477 {
478         struct cl_object_header *hdr = cl_object_header(obj);
479
480         set_bit(LU_OBJECT_HEARD_BANSHEE, &hdr->coh_lu.loh_flags);
481 }
482 EXPORT_SYMBOL(cl_object_kill);
483
484 void cache_stats_init(struct cache_stats *cs, const char *name)
485 {
486         int i;
487
488         cs->cs_name = name;
489         for (i = 0; i < CS_NR; i++)
490                 atomic_set(&cs->cs_stats[i], 0);
491 }
492
493 static int cache_stats_print(const struct cache_stats *cs,
494                              struct seq_file *m, int h)
495 {
496         int i;
497
498         /*
499          *   lookup    hit    total  cached create
500          * env: ...... ...... ...... ...... ......
501          */
502         if (h) {
503                 const char *names[CS_NR] = CS_NAMES;
504
505                 seq_printf(m, "%6s", " ");
506                 for (i = 0; i < CS_NR; i++)
507                         seq_printf(m, "%8s", names[i]);
508                 seq_printf(m, "\n");
509         }
510
511         seq_printf(m, "%5.5s:", cs->cs_name);
512         for (i = 0; i < CS_NR; i++)
513                 seq_printf(m, "%8u", atomic_read(&cs->cs_stats[i]));
514         return 0;
515 }
516
517 static void cl_env_percpu_refill(void);
518
519 /**
520  * Initialize client site.
521  *
522  * Perform common initialization (lu_site_init()), and initialize statistical
523  * counters. Also perform global initializations on the first call.
524  */
525 int cl_site_init(struct cl_site *s, struct cl_device *d)
526 {
527         size_t i;
528         int result;
529
530         result = lu_site_init(&s->cs_lu, &d->cd_lu_dev);
531         if (result == 0) {
532                 cache_stats_init(&s->cs_pages, "pages");
533                 for (i = 0; i < ARRAY_SIZE(s->cs_pages_state); ++i)
534                         atomic_set(&s->cs_pages_state[0], 0);
535                 cl_env_percpu_refill();
536         }
537         return result;
538 }
539 EXPORT_SYMBOL(cl_site_init);
540
541 /**
542  * Finalize client site. Dual to cl_site_init().
543  */
544 void cl_site_fini(struct cl_site *s)
545 {
546         lu_site_fini(&s->cs_lu);
547 }
548 EXPORT_SYMBOL(cl_site_fini);
549
550 static struct cache_stats cl_env_stats = {
551         .cs_name    = "envs",
552         .cs_stats = { ATOMIC_INIT(0), }
553 };
554
555 /**
556  * Outputs client site statistical counters into a buffer. Suitable for
557  * ll_rd_*()-style functions.
558  */
559 int cl_site_stats_print(const struct cl_site *site, struct seq_file *m)
560 {
561         static const char *const pstate[] = {
562                 [CPS_CACHED]    = "c",
563                 [CPS_OWNED]     = "o",
564                 [CPS_PAGEOUT]   = "w",
565                 [CPS_PAGEIN]    = "r",
566                 [CPS_FREEING]   = "f"
567         };
568         size_t i;
569
570 /*
571        lookup    hit  total   busy create
572 pages: ...... ...... ...... ...... ...... [...... ...... ...... ......]
573 locks: ...... ...... ...... ...... ...... [...... ...... ...... ...... ......]
574   env: ...... ...... ...... ...... ......
575  */
576         lu_site_stats_seq_print(&site->cs_lu, m);
577         cache_stats_print(&site->cs_pages, m, 1);
578         seq_printf(m, " [");
579         for (i = 0; i < ARRAY_SIZE(site->cs_pages_state); ++i)
580                 seq_printf(m, "%s: %u ", pstate[i],
581                            atomic_read(&site->cs_pages_state[i]));
582         seq_printf(m, "]\n");
583         cache_stats_print(&cl_env_stats, m, 0);
584         seq_printf(m, "\n");
585         return 0;
586 }
587 EXPORT_SYMBOL(cl_site_stats_print);
588
589 /*****************************************************************************
590  *
591  * lu_env handling on client.
592  *
593  */
594
595 static unsigned cl_envs_cached_max = 32; /* XXX: prototype: arbitrary limit
596                                           * for now. */
597 static struct cl_env_cache {
598         rwlock_t                cec_guard;
599         unsigned                cec_count;
600         struct list_head        cec_envs;
601 } *cl_envs = NULL;
602
603 struct cl_env {
604         void             *ce_magic;
605         struct lu_env     ce_lu;
606         struct lu_context ce_ses;
607
608         /*
609          * Linkage into global list of all client environments. Used for
610          * garbage collection.
611          */
612         struct list_head  ce_linkage;
613         /*
614          *
615          */
616         int               ce_ref;
617         /*
618          * Debugging field: address of the caller who made original
619          * allocation.
620          */
621         void             *ce_debug;
622 };
623
624 static void cl_env_inc(enum cache_stats_item item)
625 {
626 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
627         atomic_inc(&cl_env_stats.cs_stats[item]);
628 #endif
629 }
630
631 static void cl_env_dec(enum cache_stats_item item)
632 {
633 #ifdef CONFIG_DEBUG_PAGESTATE_TRACKING
634         LASSERT(atomic_read(&cl_env_stats.cs_stats[item]) > 0);
635         atomic_dec(&cl_env_stats.cs_stats[item]);
636 #endif
637 }
638
639 static void cl_env_init0(struct cl_env *cle, void *debug)
640 {
641         LASSERT(cle->ce_ref == 0);
642         LASSERT(cle->ce_magic == &cl_env_init0);
643         LASSERT(cle->ce_debug == NULL);
644
645         cle->ce_ref = 1;
646         cle->ce_debug = debug;
647         cl_env_inc(CS_busy);
648 }
649
650 static struct lu_env *cl_env_new(__u32 ctx_tags, __u32 ses_tags, void *debug)
651 {
652         struct lu_env *env;
653         struct cl_env *cle;
654
655         OBD_SLAB_ALLOC_PTR_GFP(cle, cl_env_kmem, GFP_NOFS);
656         if (cle != NULL) {
657                 int rc;
658
659                 INIT_LIST_HEAD(&cle->ce_linkage);
660                 cle->ce_magic = &cl_env_init0;
661                 env = &cle->ce_lu;
662                 rc = lu_env_init(env, LCT_CL_THREAD|ctx_tags);
663                 if (rc == 0) {
664                         rc = lu_context_init(&cle->ce_ses,
665                                              LCT_SESSION | ses_tags);
666                         if (rc == 0) {
667                                 lu_context_enter(&cle->ce_ses);
668                                 env->le_ses = &cle->ce_ses;
669                                 cl_env_init0(cle, debug);
670                         } else
671                                 lu_env_fini(env);
672                 }
673                 if (rc != 0) {
674                         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
675                         env = ERR_PTR(rc);
676                 } else {
677                         cl_env_inc(CS_create);
678                         cl_env_inc(CS_total);
679                 }
680         } else
681                 env = ERR_PTR(-ENOMEM);
682         return env;
683 }
684
685 static void cl_env_fini(struct cl_env *cle)
686 {
687         cl_env_dec(CS_total);
688         lu_context_fini(&cle->ce_lu.le_ctx);
689         lu_context_fini(&cle->ce_ses);
690         OBD_SLAB_FREE_PTR(cle, cl_env_kmem);
691 }
692
693 /* Get a cl_env, either from the per-CPU cache for the current CPU, or by
694  * allocating a new one.
695  */
696 static struct lu_env *cl_env_obtain(void *debug)
697 {
698         struct cl_env *cle;
699         struct lu_env *env;
700         int cpu = get_cpu();
701
702         ENTRY;
703
704         read_lock(&cl_envs[cpu].cec_guard);
705         LASSERT(equi(cl_envs[cpu].cec_count == 0,
706                 list_empty(&cl_envs[cpu].cec_envs)));
707         if (cl_envs[cpu].cec_count > 0) {
708                 int rc;
709
710                 cle = container_of(cl_envs[cpu].cec_envs.next, struct cl_env,
711                                    ce_linkage);
712                 list_del_init(&cle->ce_linkage);
713                 cl_envs[cpu].cec_count--;
714                 read_unlock(&cl_envs[cpu].cec_guard);
715                 put_cpu();
716
717                 env = &cle->ce_lu;
718                 rc = lu_env_refill(env);
719                 if (rc == 0) {
720                         cl_env_init0(cle, debug);
721                         lu_context_enter(&env->le_ctx);
722                         lu_context_enter(&cle->ce_ses);
723                 } else {
724                         cl_env_fini(cle);
725                         env = ERR_PTR(rc);
726                 }
727         } else {
728                 read_unlock(&cl_envs[cpu].cec_guard);
729                 put_cpu();
730                 env = cl_env_new(lu_context_tags_default,
731                                  lu_session_tags_default, debug);
732         }
733         RETURN(env);
734 }
735
736 static inline struct cl_env *cl_env_container(struct lu_env *env)
737 {
738         return container_of(env, struct cl_env, ce_lu);
739 }
740
741 /**
742  * Returns an lu_env.
743  *
744  * No link to thread, this returns an env from the cache or
745  * allocates a new one.
746  *
747  * If you need to get the specific environment you created for this thread,
748  * you must either pass the pointer directly or store it in the file/inode
749  * private data and retrieve it from there using ll_cl_add/ll_cl_find.
750  *
751  * \param refcheck pointer to a counter used to detect environment leaks. In
752  * the usual case cl_env_get() and cl_env_put() are called in the same lexical
753  * scope and pointer to the same integer is passed as \a refcheck. This is
754  * used to detect missed cl_env_put().
755  *
756  * \see cl_env_put()
757  */
758 struct lu_env *cl_env_get(__u16 *refcheck)
759 {
760         struct lu_env *env;
761
762         env = cl_env_obtain(__builtin_return_address(0));
763         if (!IS_ERR(env)) {
764                 struct cl_env *cle;
765
766                 cle = cl_env_container(env);
767                 *refcheck = cle->ce_ref;
768                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
769         }
770         return env;
771 }
772 EXPORT_SYMBOL(cl_env_get);
773
774 /**
775  * Forces an allocation of a fresh environment with given tags.
776  *
777  * \see cl_env_get()
778  */
779 struct lu_env *cl_env_alloc(__u16 *refcheck, __u32 tags)
780 {
781         struct lu_env *env;
782
783         env = cl_env_new(tags, tags, __builtin_return_address(0));
784         if (!IS_ERR(env)) {
785                 struct cl_env *cle;
786
787                 cle = cl_env_container(env);
788                 *refcheck = cle->ce_ref;
789                 CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
790         }
791         return env;
792 }
793 EXPORT_SYMBOL(cl_env_alloc);
794
795 static void cl_env_exit(struct cl_env *cle)
796 {
797         lu_context_exit(&cle->ce_lu.le_ctx);
798         lu_context_exit(&cle->ce_ses);
799 }
800
801 /**
802  * Finalizes and frees a given number of cached environments. This is done to
803  * (1) free some memory (not currently hooked into VM), or (2) release
804  * references to modules.
805  */
806 unsigned cl_env_cache_purge(unsigned nr)
807 {
808         struct cl_env *cle;
809         unsigned i;
810
811         ENTRY;
812         for_each_possible_cpu(i) {
813                 write_lock(&cl_envs[i].cec_guard);
814                 for (; !list_empty(&cl_envs[i].cec_envs) && nr > 0; --nr) {
815                         cle = container_of(cl_envs[i].cec_envs.next,
816                                            struct cl_env, ce_linkage);
817                         list_del_init(&cle->ce_linkage);
818                         LASSERT(cl_envs[i].cec_count > 0);
819                         cl_envs[i].cec_count--;
820                         write_unlock(&cl_envs[i].cec_guard);
821
822                         cl_env_fini(cle);
823                         write_lock(&cl_envs[i].cec_guard);
824                 }
825                 LASSERT(equi(cl_envs[i].cec_count == 0,
826                         list_empty(&cl_envs[i].cec_envs)));
827                 write_unlock(&cl_envs[i].cec_guard);
828         }
829         RETURN(nr);
830 }
831 EXPORT_SYMBOL(cl_env_cache_purge);
832
833 /**
834  * Release an environment.
835  *
836  * Decrement \a env reference counter. When counter drops to 0, nothing in
837  * this thread is using environment and it is returned to the per-CPU cache or
838  * freed immediately if the cache is full.
839  */
840 void cl_env_put(struct lu_env *env, __u16 *refcheck)
841 {
842         struct cl_env *cle;
843
844         cle = cl_env_container(env);
845
846         LASSERT(cle->ce_ref > 0);
847         LASSERT(ergo(refcheck != NULL, cle->ce_ref == *refcheck));
848
849         CDEBUG(D_OTHER, "%d@%p\n", cle->ce_ref, cle);
850         if (--cle->ce_ref == 0) {
851                 int cpu = get_cpu();
852
853                 cl_env_dec(CS_busy);
854                 cle->ce_debug = NULL;
855                 cl_env_exit(cle);
856                 /*
857                  * Don't bother to take a lock here.
858                  *
859                  * Return environment to the cache only when it was allocated
860                  * with the standard tags.
861                  */
862                 if (cl_envs[cpu].cec_count < cl_envs_cached_max &&
863                     (env->le_ctx.lc_tags & ~LCT_HAS_EXIT) == lu_context_tags_default &&
864                     (env->le_ses->lc_tags & ~LCT_HAS_EXIT) == lu_session_tags_default) {
865                         read_lock(&cl_envs[cpu].cec_guard);
866                         list_add(&cle->ce_linkage, &cl_envs[cpu].cec_envs);
867                         cl_envs[cpu].cec_count++;
868                         read_unlock(&cl_envs[cpu].cec_guard);
869                 } else
870                         cl_env_fini(cle);
871                 put_cpu();
872         }
873 }
874 EXPORT_SYMBOL(cl_env_put);
875
876 /**
877  * Converts struct cl_attr to struct ost_lvb.
878  *
879  * \see cl_lvb2attr
880  */
881 void cl_attr2lvb(struct ost_lvb *lvb, const struct cl_attr *attr)
882 {
883         lvb->lvb_size   = attr->cat_size;
884         lvb->lvb_mtime  = attr->cat_mtime;
885         lvb->lvb_atime  = attr->cat_atime;
886         lvb->lvb_ctime  = attr->cat_ctime;
887         lvb->lvb_blocks = attr->cat_blocks;
888 }
889
890 /**
891  * Converts struct ost_lvb to struct cl_attr.
892  *
893  * \see cl_attr2lvb
894  */
895 void cl_lvb2attr(struct cl_attr *attr, const struct ost_lvb *lvb)
896 {
897         attr->cat_size   = lvb->lvb_size;
898         attr->cat_mtime  = lvb->lvb_mtime;
899         attr->cat_atime  = lvb->lvb_atime;
900         attr->cat_ctime  = lvb->lvb_ctime;
901         attr->cat_blocks = lvb->lvb_blocks;
902 }
903 EXPORT_SYMBOL(cl_lvb2attr);
904
905 static struct cl_env cl_env_percpu[NR_CPUS];
906 static DEFINE_MUTEX(cl_env_percpu_mutex);
907
908 static int cl_env_percpu_init(void)
909 {
910         struct cl_env *cle;
911         int tags = LCT_REMEMBER | LCT_NOREF;
912         int i, j;
913         int rc = 0;
914
915         for_each_possible_cpu(i) {
916                 struct lu_env *env;
917
918                 rwlock_init(&cl_envs[i].cec_guard);
919                 INIT_LIST_HEAD(&cl_envs[i].cec_envs);
920                 cl_envs[i].cec_count = 0;
921
922                 cle = &cl_env_percpu[i];
923                 env = &cle->ce_lu;
924
925                 INIT_LIST_HEAD(&cle->ce_linkage);
926                 cle->ce_magic = &cl_env_init0;
927                 rc = lu_env_init(env, LCT_CL_THREAD | tags);
928                 if (rc == 0) {
929                         rc = lu_context_init(&cle->ce_ses, LCT_SESSION | tags);
930                         if (rc == 0) {
931                                 lu_context_enter(&cle->ce_ses);
932                                 env->le_ses = &cle->ce_ses;
933                         } else {
934                                 lu_env_fini(env);
935                         }
936                 }
937                 if (rc != 0)
938                         break;
939         }
940         if (rc != 0) {
941                 /* Indices 0 to i (excluding i) were correctly initialized,
942                  * thus we must uninitialize up to i, the rest are undefined. */
943                 for (j = 0; j < i; j++) {
944                         cle = &cl_env_percpu[j];
945                         lu_context_exit(&cle->ce_ses);
946                         lu_context_fini(&cle->ce_ses);
947                         lu_env_fini(&cle->ce_lu);
948                 }
949         }
950
951         return rc;
952 }
953
954 static void cl_env_percpu_fini(void)
955 {
956         int i;
957
958         for_each_possible_cpu(i) {
959                 struct cl_env *cle = &cl_env_percpu[i];
960
961                 lu_context_exit(&cle->ce_ses);
962                 lu_context_fini(&cle->ce_ses);
963                 lu_env_fini(&cle->ce_lu);
964         }
965 }
966
967 static void cl_env_percpu_refill(void)
968 {
969         int i;
970
971         mutex_lock(&cl_env_percpu_mutex);
972         for_each_possible_cpu(i)
973                 lu_env_refill(&cl_env_percpu[i].ce_lu);
974         mutex_unlock(&cl_env_percpu_mutex);
975 }
976
977 void cl_env_percpu_put(struct lu_env *env)
978 {
979         struct cl_env *cle;
980         int cpu;
981
982         cpu = smp_processor_id();
983         cle = cl_env_container(env);
984         LASSERT(cle == &cl_env_percpu[cpu]);
985
986         cle->ce_ref--;
987         LASSERT(cle->ce_ref == 0);
988
989         cl_env_dec(CS_busy);
990         cle->ce_debug = NULL;
991
992         put_cpu();
993 }
994 EXPORT_SYMBOL(cl_env_percpu_put);
995
996 struct lu_env *cl_env_percpu_get(void)
997 {
998         struct cl_env *cle;
999
1000         cle = &cl_env_percpu[get_cpu()];
1001         cl_env_init0(cle, __builtin_return_address(0));
1002
1003         return &cle->ce_lu;
1004 }
1005 EXPORT_SYMBOL(cl_env_percpu_get);
1006
1007 /*****************************************************************************
1008  *
1009  * Temporary prototype thing: mirror obd-devices into cl devices.
1010  *
1011  */
1012
1013 struct cl_device *cl_type_setup(const struct lu_env *env, struct lu_site *site,
1014                                 struct lu_device_type *ldt,
1015                                 struct lu_device *next)
1016 {
1017         const char       *typename;
1018         struct lu_device *d;
1019
1020         LASSERT(ldt != NULL);
1021
1022         typename = ldt->ldt_name;
1023         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, NULL);
1024         if (!IS_ERR(d)) {
1025                 int rc;
1026
1027                 if (site != NULL)
1028                         d->ld_site = site;
1029                 rc = ldt->ldt_ops->ldto_device_init(env, d, typename, next);
1030                 if (rc == 0) {
1031                         lu_device_get(d);
1032                         lu_ref_add(&d->ld_reference,
1033                                    "lu-stack", &lu_site_init);
1034                 } else {
1035                         ldt->ldt_ops->ldto_device_free(env, d);
1036                         CERROR("can't init device '%s', %d\n", typename, rc);
1037                         d = ERR_PTR(rc);
1038                 }
1039         } else
1040                 CERROR("Cannot allocate device: '%s'\n", typename);
1041         return lu2cl_dev(d);
1042 }
1043 EXPORT_SYMBOL(cl_type_setup);
1044
1045 /**
1046  * Finalize device stack by calling lu_stack_fini().
1047  */
1048 void cl_stack_fini(const struct lu_env *env, struct cl_device *cl)
1049 {
1050         lu_stack_fini(env, cl2lu_dev(cl));
1051 }
1052 EXPORT_SYMBOL(cl_stack_fini);
1053
1054 static struct lu_context_key cl_key;
1055
1056 struct cl_thread_info *cl_env_info(const struct lu_env *env)
1057 {
1058         return lu_context_key_get(&env->le_ctx, &cl_key);
1059 }
1060
1061 /* defines cl_key_{init,fini}() */
1062 LU_KEY_INIT_FINI(cl, struct cl_thread_info);
1063
1064 static struct lu_context_key cl_key = {
1065         .lct_tags = LCT_CL_THREAD,
1066         .lct_init = cl_key_init,
1067         .lct_fini = cl_key_fini,
1068 };
1069
1070 static struct lu_kmem_descr cl_object_caches[] = {
1071         {
1072                 .ckd_cache = &cl_env_kmem,
1073                 .ckd_name  = "cl_env_kmem",
1074                 .ckd_size  = sizeof(struct cl_env)
1075         },
1076         {
1077                 .ckd_cache = &cl_dio_aio_kmem,
1078                 .ckd_name  = "cl_dio_aio_kmem",
1079                 .ckd_size  = sizeof(struct cl_dio_aio)
1080         },
1081         {
1082                 .ckd_cache = &cl_sub_dio_kmem,
1083                 .ckd_name  = "cl_sub_dio_kmem",
1084                 .ckd_size  = sizeof(struct cl_sub_dio)
1085         },
1086         {
1087                 .ckd_cache = NULL
1088         }
1089 };
1090
1091 /**
1092  * Global initialization of cl-data. Create kmem caches, register
1093  * lu_context_key's, etc.
1094  *
1095  * \see cl_global_fini()
1096  */
1097 int cl_global_init(void)
1098 {
1099         int result;
1100
1101         OBD_ALLOC_PTR_ARRAY(cl_envs, num_possible_cpus());
1102         if (cl_envs == NULL)
1103                 GOTO(out, result = -ENOMEM);
1104
1105         result = lu_kmem_init(cl_object_caches);
1106         if (result)
1107                 GOTO(out_envs, result);
1108
1109         LU_CONTEXT_KEY_INIT(&cl_key);
1110         result = lu_context_key_register(&cl_key);
1111         if (result)
1112                 GOTO(out_kmem, result);
1113
1114         result = cl_env_percpu_init();
1115         if (result) /* no cl_env_percpu_fini on error */
1116                 GOTO(out_keys, result);
1117
1118         return 0;
1119
1120 out_keys:
1121         lu_context_key_degister(&cl_key);
1122 out_kmem:
1123         lu_kmem_fini(cl_object_caches);
1124 out_envs:
1125         OBD_FREE_PTR_ARRAY(cl_envs, num_possible_cpus());
1126 out:
1127         return result;
1128 }
1129
1130 /**
1131  * Finalization of global cl-data. Dual to cl_global_init().
1132  */
1133 void cl_global_fini(void)
1134 {
1135         int i;
1136
1137         for (i = 0; i < ARRAY_SIZE(cl_page_kmem_array); i++) {
1138                 if (cl_page_kmem_array[i]) {
1139                         kmem_cache_destroy(cl_page_kmem_array[i]);
1140                         cl_page_kmem_array[i] = NULL;
1141                 }
1142         }
1143         cl_env_percpu_fini();
1144         lu_context_key_degister(&cl_key);
1145         lu_kmem_fini(cl_object_caches);
1146         OBD_FREE_PTR_ARRAY(cl_envs, num_possible_cpus());
1147 }