Whamcloud - gitweb
LU-1301 lu: local objects library
[fs/lustre-release.git] / lustre / obdclass / local_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012 Whamcloud, Inc.
24  */
25 /*
26  * lustre/obdclass/local_storage.c
27  *
28  * Local storage for file/objects with fid generation. Works on top of OSD.
29  *
30  * Author: Mikhail Pershin <mike.pershin@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_CLASS
34
35 #include "local_storage.h"
36
37 /* all initialized local storages on this node are linked on this */
38 static CFS_LIST_HEAD(ls_list_head);
39 static CFS_DEFINE_MUTEX(ls_list_mutex);
40
41 static int ls_object_init(const struct lu_env *env, struct lu_object *o,
42                           const struct lu_object_conf *unused)
43 {
44         struct ls_device        *ls;
45         struct lu_object        *below;
46         struct lu_device        *under;
47
48         ENTRY;
49
50         ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev);
51         under = &ls->ls_osd->dd_lu_dev;
52         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
53         if (below == NULL)
54                 RETURN(-ENOMEM);
55
56         lu_object_add(o, below);
57
58         RETURN(0);
59 }
60
61 static void ls_object_free(const struct lu_env *env, struct lu_object *o)
62 {
63         struct ls_object        *obj = lu2ls_obj(o);
64         struct lu_object_header *h = o->lo_header;
65
66         dt_object_fini(&obj->ls_obj);
67         lu_object_header_fini(h);
68         OBD_FREE_PTR(obj);
69 }
70
71 struct lu_object_operations ls_lu_obj_ops = {
72         .loo_object_init  = ls_object_init,
73         .loo_object_free  = ls_object_free,
74 };
75
76 struct lu_object *ls_object_alloc(const struct lu_env *env,
77                                   const struct lu_object_header *_h,
78                                   struct lu_device *d)
79 {
80         struct lu_object_header *h;
81         struct ls_object        *o;
82         struct lu_object        *l;
83
84         LASSERT(_h == NULL);
85
86         OBD_ALLOC_PTR(o);
87         if (o != NULL) {
88                 l = &o->ls_obj.do_lu;
89                 h = &o->ls_header;
90
91                 lu_object_header_init(h);
92                 dt_object_init(&o->ls_obj, h, d);
93                 lu_object_add_top(h, l);
94
95                 l->lo_ops = &ls_lu_obj_ops;
96
97                 return l;
98         } else {
99                 return NULL;
100         }
101 }
102
103 static struct lu_device_operations ls_lu_dev_ops = {
104         .ldo_object_alloc =     ls_object_alloc
105 };
106
107 static struct ls_device *__ls_find_dev(struct dt_device *dev)
108 {
109         struct ls_device *ls, *ret = NULL;
110
111         cfs_list_for_each_entry(ls, &ls_list_head, ls_linkage) {
112                 if (ls->ls_osd == dev) {
113                         cfs_atomic_inc(&ls->ls_refcount);
114                         ret = ls;
115                         break;
116                 }
117         }
118         return ret;
119 }
120
121 struct ls_device *ls_find_dev(struct dt_device *dev)
122 {
123         struct ls_device *ls;
124
125         cfs_mutex_lock(&ls_list_mutex);
126         ls = __ls_find_dev(dev);
127         cfs_mutex_unlock(&ls_list_mutex);
128
129         return ls;
130 }
131
132 static struct lu_device_type_operations ls_device_type_ops = {
133         .ldto_start = NULL,
134         .ldto_stop  = NULL,
135 };
136
137 static struct lu_device_type ls_lu_type = {
138         .ldt_name = "local_storage",
139         .ldt_ops  = &ls_device_type_ops,
140 };
141
142 static struct ls_device *ls_device_get(const struct lu_env *env,
143                                        struct dt_device *dev)
144 {
145         struct ls_device *ls;
146
147         ENTRY;
148
149         cfs_mutex_lock(&ls_list_mutex);
150         ls = __ls_find_dev(dev);
151         if (ls)
152                 GOTO(out_ls, ls);
153
154         /* not found, then create */
155         OBD_ALLOC_PTR(ls);
156         if (ls == NULL)
157                 GOTO(out_ls, ls = ERR_PTR(-ENOMEM));
158
159         cfs_atomic_set(&ls->ls_refcount, 1);
160         CFS_INIT_LIST_HEAD(&ls->ls_los_list);
161         cfs_mutex_init(&ls->ls_los_mutex);
162
163         ls->ls_osd = dev;
164
165         LASSERT(dev->dd_lu_dev.ld_site);
166         lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type);
167         ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops;
168         ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site;
169
170         /* finally add ls to the list */
171         cfs_list_add(&ls->ls_linkage, &ls_list_head);
172 out_ls:
173         cfs_mutex_unlock(&ls_list_mutex);
174         RETURN(ls);
175 }
176
177 static void ls_device_put(const struct lu_env *env, struct ls_device *ls)
178 {
179         LASSERT(env);
180         if (!cfs_atomic_dec_and_test(&ls->ls_refcount))
181                 return;
182
183         cfs_mutex_lock(&ls_list_mutex);
184         if (cfs_atomic_read(&ls->ls_refcount) == 0) {
185                 LASSERT(cfs_list_empty(&ls->ls_los_list));
186                 cfs_list_del(&ls->ls_linkage);
187                 lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0);
188                 lu_device_fini(&ls->ls_top_dev.dd_lu_dev);
189                 OBD_FREE_PTR(ls);
190         }
191         cfs_mutex_unlock(&ls_list_mutex);
192 }
193
194 /**
195  * local file fid generation
196  */
197 int local_object_fid_generate(const struct lu_env *env,
198                               struct local_oid_storage *los,
199                               struct lu_fid *fid)
200 {
201         LASSERT(los->los_dev);
202         LASSERT(los->los_obj);
203
204         /* take next OID */
205
206         /* to make it unique after reboot we store
207          * the latest generated fid atomically with
208          * object creation see local_object_create() */
209
210         cfs_mutex_lock(&los->los_id_lock);
211         fid->f_seq = los->los_seq;
212         fid->f_oid = los->los_last_oid++;
213         fid->f_ver = 0;
214         cfs_mutex_unlock(&los->los_id_lock);
215
216         return 0;
217 }
218
219 int local_object_declare_create(const struct lu_env *env,
220                                 struct local_oid_storage *los,
221                                 struct dt_object *o, struct lu_attr *attr,
222                                 struct dt_object_format *dof,
223                                 struct thandle *th)
224 {
225         struct dt_thread_info   *dti = dt_info(env);
226         int                      rc;
227
228         ENTRY;
229
230         /* update fid generation file */
231         if (los != NULL) {
232                 LASSERT(dt_object_exists(los->los_obj));
233                 rc = dt_declare_record_write(env, los->los_obj,
234                                              sizeof(struct los_ondisk), 0, th);
235                 if (rc)
236                         RETURN(rc);
237         }
238
239         rc = dt_declare_create(env, o, attr, NULL, dof, th);
240         if (rc)
241                 RETURN(rc);
242
243         dti->dti_lb.lb_buf = NULL;
244         dti->dti_lb.lb_len = sizeof(dti->dti_lma);
245         rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th);
246
247         RETURN(rc);
248 }
249
250 int local_object_create(const struct lu_env *env,
251                         struct local_oid_storage *los,
252                         struct dt_object *o, struct lu_attr *attr,
253                         struct dt_object_format *dof, struct thandle *th)
254 {
255         struct dt_thread_info   *dti = dt_info(env);
256         struct los_ondisk        losd;
257         int                      rc;
258
259         ENTRY;
260
261         rc = dt_create(env, o, attr, NULL, dof, th);
262         if (rc)
263                 RETURN(rc);
264
265         lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
266         lustre_lma_swab(&dti->dti_lma);
267         dti->dti_lb.lb_buf = &dti->dti_lma;
268         dti->dti_lb.lb_len = sizeof(dti->dti_lma);
269         rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th,
270                           BYPASS_CAPA);
271
272         if (los == NULL)
273                 RETURN(rc);
274
275         LASSERT(los->los_obj);
276         LASSERT(dt_object_exists(los->los_obj));
277
278         /* many threads can be updated this, serialize
279          * them here to avoid the race where one thread
280          * takes the value first, but writes it last */
281         cfs_mutex_lock(&los->los_id_lock);
282
283         /* update local oid number on disk so that
284          * we know the last one used after reboot */
285         losd.lso_magic = cpu_to_le32(LOS_MAGIC);
286         losd.lso_next_oid = cpu_to_le32(los->los_last_oid);
287
288         dti->dti_off = 0;
289         dti->dti_lb.lb_buf = &losd;
290         dti->dti_lb.lb_len = sizeof(losd);
291         rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
292                              th);
293         cfs_mutex_unlock(&los->los_id_lock);
294
295         RETURN(rc);
296 }
297
298 /*
299  * Create local named object (file, directory or index) in parent directory.
300  */
301 struct dt_object *__local_file_create(const struct lu_env *env,
302                                       const struct lu_fid *fid,
303                                       struct local_oid_storage *los,
304                                       struct ls_device *ls,
305                                       struct dt_object *parent,
306                                       const char *name, __u32 mode)
307 {
308         struct dt_thread_info   *dti = dt_info(env);
309         struct dt_object        *dto;
310         struct thandle          *th;
311         int                      rc;
312
313         dto = ls_locate(env, ls, fid);
314         if (unlikely(IS_ERR(dto)))
315                 RETURN(dto);
316
317         LASSERT(dto != NULL);
318         if (dt_object_exists(dto))
319                 GOTO(out, rc = -EEXIST);
320
321         /* create the object */
322         dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
323         dti->dti_attr.la_mode = mode;
324         dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
325
326         th = dt_trans_create(env, ls->ls_osd);
327         if (IS_ERR(th))
328                 GOTO(out, rc = PTR_ERR(th));
329
330         rc = local_object_declare_create(env, los, dto, &dti->dti_attr,
331                                          &dti->dti_dof, th);
332         if (rc)
333                 GOTO(trans_stop, rc);
334
335         if (dti->dti_dof.dof_type == DFT_DIR) {
336                 dt_declare_ref_add(env, dto, th);
337                 dt_declare_ref_add(env, parent, th);
338         }
339
340         rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th);
341         if (rc)
342                 GOTO(trans_stop, rc);
343
344         rc = dt_trans_start_local(env, ls->ls_osd, th);
345         if (rc)
346                 GOTO(trans_stop, rc);
347
348         dt_write_lock(env, dto, 0);
349         if (dt_object_exists(dto))
350                 GOTO(unlock, rc = 0);
351
352         CDEBUG(D_OTHER, "create new object "DFID"\n",
353                PFID(lu_object_fid(&dto->do_lu)));
354         rc = local_object_create(env, los, dto, &dti->dti_attr,
355                                  &dti->dti_dof, th);
356         if (rc)
357                 GOTO(unlock, rc);
358         LASSERT(dt_object_exists(dto));
359
360         if (dti->dti_dof.dof_type == DFT_DIR) {
361                 if (!dt_try_as_dir(env, dto))
362                         GOTO(destroy, rc = -ENOTDIR);
363                 /* Add "." and ".." for newly created dir */
364                 rc = dt_insert(env, dto, (void *)fid, (void *)".", th,
365                                BYPASS_CAPA, 1);
366                 if (rc)
367                         GOTO(destroy, rc);
368                 dt_ref_add(env, dto, th);
369                 rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu),
370                                (void *)"..", th, BYPASS_CAPA, 1);
371                 if (rc)
372                         GOTO(destroy, rc);
373         }
374
375         dt_write_lock(env, parent, 0);
376         rc = dt_insert(env, parent, (const struct dt_rec *)fid,
377                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
378         if (dti->dti_dof.dof_type == DFT_DIR)
379                 dt_ref_add(env, parent, th);
380         dt_write_unlock(env, parent);
381         if (rc)
382                 GOTO(destroy, rc);
383 destroy:
384         if (rc)
385                 dt_destroy(env, dto, th);
386 unlock:
387         dt_write_unlock(env, dto);
388 trans_stop:
389         dt_trans_stop(env, ls->ls_osd, th);
390 out:
391         if (rc) {
392                 lu_object_put_nocache(env, &dto->do_lu);
393                 dto = ERR_PTR(rc);
394         } else {
395                 struct lu_fid dti_fid;
396                 /* since local files FIDs are not in OI the directory entry
397                  * is used to get inode number/generation, we need to do lookup
398                  * again to cache this data after create */
399                 rc = dt_lookup_dir(env, parent, name, &dti_fid);
400                 LASSERT(rc == 0);
401         }
402         RETURN(dto);
403 }
404
405 /*
406  * Look up and create (if it does not exist) a local named file or directory in
407  * parent directory.
408  */
409 struct dt_object *local_file_find_or_create(const struct lu_env *env,
410                                             struct local_oid_storage *los,
411                                             struct dt_object *parent,
412                                             const char *name, __u32 mode)
413 {
414         struct dt_thread_info   *dti = dt_info(env);
415         struct dt_object        *dto;
416         int                      rc;
417
418         LASSERT(parent);
419
420         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
421         if (rc == 0)
422                 /* name is found, get the object */
423                 dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
424         else if (rc != -ENOENT)
425                 dto = ERR_PTR(rc);
426         else {
427                 rc = local_object_fid_generate(env, los, &dti->dti_fid);
428                 if (rc < 0)
429                         dto = ERR_PTR(rc);
430                 else
431                         dto = __local_file_create(env, &dti->dti_fid, los,
432                                                   dt2ls_dev(los->los_dev),
433                                                   parent, name, mode);
434         }
435         return dto;
436 }
437 EXPORT_SYMBOL(local_file_find_or_create);
438
439 struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
440                                                      struct dt_device *dt,
441                                                      const struct lu_fid *fid,
442                                                      struct dt_object *parent,
443                                                      const char *name,
444                                                      __u32 mode)
445 {
446         struct dt_thread_info   *dti = dt_info(env);
447         struct dt_object        *dto;
448         int                      rc;
449
450         LASSERT(parent);
451
452         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
453         if (rc == 0) {
454                 /* name is found, get the object */
455                 if (!lu_fid_eq(fid, &dti->dti_fid))
456                         dto = ERR_PTR(-EINVAL);
457                 else
458                         dto = dt_locate(env, dt, fid);
459         } else if (rc != -ENOENT) {
460                 dto = ERR_PTR(rc);
461         } else {
462                 struct ls_device *ls;
463
464                 ls = ls_device_get(env, dt);
465                 if (IS_ERR(ls))
466                         dto = ERR_PTR(PTR_ERR(ls));
467                 else
468                         dto = __local_file_create(env, fid, NULL, ls, parent,
469                                                   name, mode);
470                 ls_device_put(env, ls);
471         }
472         return dto;
473 }
474 EXPORT_SYMBOL(local_file_find_or_create_with_fid);
475
476 static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
477 {
478         struct local_oid_storage *los, *ret = NULL;
479
480         cfs_list_for_each_entry(los, &ls->ls_los_list, los_list) {
481                 if (los->los_seq == seq) {
482                         cfs_atomic_inc(&los->los_refcount);
483                         ret = los;
484                         break;
485                 }
486         }
487         return ret;
488 }
489
490 /**
491  * Initialize local OID storage for required sequence.
492  * That may be needed for services that uses local files and requires
493  * dynamic OID allocation for them.
494  *
495  * Per each sequence we have an object with 'first_fid' identificator
496  * containing the counter for OIDs of locally created files with that
497  * sequence.
498  *
499  * It is used now by llog subsystem and MGS for NID tables
500  *
501  * Function gets first_fid to create counter object.
502  * All dynamic fids will be generated with the same sequence and incremented
503  * OIDs
504  *
505  * Returned local_oid_storage is in-memory representaion of OID storage
506  */
507 int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
508                            const struct lu_fid *first_fid,
509                            struct local_oid_storage **los)
510 {
511         struct dt_thread_info   *dti = dt_info(env);
512         struct ls_device        *ls;
513         struct los_ondisk        losd;
514         struct dt_object        *o;
515         struct dt_object        *root = NULL;
516         struct thandle          *th;
517         int                      rc;
518
519         ENTRY;
520
521         ls = ls_device_get(env, dev);
522         if (IS_ERR(ls))
523                 RETURN(PTR_ERR(ls));
524
525         cfs_mutex_lock(&ls->ls_los_mutex);
526         *los = dt_los_find(ls, fid_seq(first_fid));
527         if (*los != NULL)
528                 GOTO(out, rc = 0);
529
530         /* not found, then create */
531         OBD_ALLOC_PTR(*los);
532         if (*los == NULL)
533                 GOTO(out, rc = -ENOMEM);
534
535         cfs_atomic_set(&(*los)->los_refcount, 1);
536         cfs_mutex_init(&(*los)->los_id_lock);
537         (*los)->los_dev = &ls->ls_top_dev;
538         cfs_atomic_inc(&ls->ls_refcount);
539         cfs_list_add(&(*los)->los_list, &ls->ls_los_list);
540
541         /* initialize data allowing to generate new fids,
542          * literally we need a sequence */
543         o = ls_locate(env, ls, first_fid);
544         if (IS_ERR(o))
545                 GOTO(out_los, rc = PTR_ERR(o));
546
547         rc = dt_root_get(env, dev, &dti->dti_fid);
548         if (rc)
549                 GOTO(out_los, rc);
550
551         root = ls_locate(env, ls, &dti->dti_fid);
552         if (IS_ERR(root))
553                 GOTO(out_los, rc = PTR_ERR(root));
554
555         if (dt_try_as_dir(env, root) == 0)
556                 GOTO(out_los, rc = -ENOTDIR);
557
558         dt_write_lock(env, o, 0);
559         if (!dt_object_exists(o)) {
560                 th = dt_trans_create(env, dev);
561                 if (IS_ERR(th))
562                         GOTO(out_lock, rc = PTR_ERR(th));
563
564                 dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
565                 dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
566                 dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG);
567
568                 rc = dt_declare_create(env, o, &dti->dti_attr, NULL,
569                                        &dti->dti_dof, th);
570                 if (rc)
571                         GOTO(out_trans, rc);
572
573                 snprintf(dti->dti_buf, sizeof(dti->dti_buf),
574                         "seq-%Lx-lastid", fid_seq(first_fid));
575                 rc = dt_declare_insert(env, root,
576                                        (const struct dt_rec *)lu_object_fid(&o->do_lu),
577                                        (const struct dt_key *)dti->dti_buf,
578                                        th);
579                 if (rc)
580                         GOTO(out_trans, rc);
581
582                 dti->dti_lb.lb_buf = NULL;
583                 dti->dti_lb.lb_len = sizeof(dti->dti_lma);
584                 rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA,
585                                           0, th);
586                 if (rc)
587                         GOTO(out_trans, rc);
588
589                 rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
590                 if (rc)
591                         GOTO(out_trans, rc);
592
593                 rc = dt_trans_start_local(env, dev, th);
594                 if (rc)
595                         GOTO(out_trans, rc);
596
597                 LASSERT(!dt_object_exists(o));
598                 rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th);
599                 if (rc)
600                         GOTO(out_trans, rc);
601                 LASSERT(dt_object_exists(o));
602
603                 lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
604                 lustre_lma_swab(&dti->dti_lma);
605                 dti->dti_lb.lb_buf = &dti->dti_lma;
606                 dti->dti_lb.lb_len = sizeof(dti->dti_lma);
607                 rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0,
608                                   th, BYPASS_CAPA);
609                 if (rc)
610                         GOTO(out_trans, rc);
611
612                 losd.lso_magic = cpu_to_le32(LOS_MAGIC);
613                 losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
614
615                 dti->dti_off = 0;
616                 dti->dti_lb.lb_buf = &losd;
617                 dti->dti_lb.lb_len = sizeof(losd);
618                 rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
619                 if (rc)
620                         GOTO(out_trans, rc);
621                 rc = dt_insert(env, root,
622                                (const struct dt_rec *)lu_object_fid(&o->do_lu),
623                                (const struct dt_key *)dti->dti_buf, th,
624                                BYPASS_CAPA, 1);
625                 if (rc)
626                         GOTO(out_trans, rc);
627 out_trans:
628                 dt_trans_stop(env, dev, th);
629         } else {
630                 dti->dti_off = 0;
631                 dti->dti_lb.lb_buf = &losd;
632                 dti->dti_lb.lb_len = sizeof(losd);
633                 rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
634                 if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
635                         CERROR("local storage file "DFID" is corrupted\n",
636                                PFID(first_fid));
637                         rc = -EINVAL;
638                 }
639         }
640 out_lock:
641         dt_write_unlock(env, o);
642 out_los:
643         if (root)
644                 lu_object_put_nocache(env, &root->do_lu);
645         if (rc) {
646                 OBD_FREE_PTR(*los);
647                 *los = NULL;
648                 if (o)
649                         lu_object_put_nocache(env, &o->do_lu);
650         } else {
651                 (*los)->los_seq = fid_seq(first_fid);
652                 (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid);
653                 (*los)->los_obj = o;
654         }
655 out:
656         cfs_mutex_unlock(&ls->ls_los_mutex);
657         ls_device_put(env, ls);
658         return rc;
659 }
660 EXPORT_SYMBOL(local_oid_storage_init);
661
662 void local_oid_storage_fini(const struct lu_env *env,
663                             struct local_oid_storage *los)
664 {
665         struct ls_device *ls;
666
667         if (!cfs_atomic_dec_and_test(&los->los_refcount))
668                 return;
669
670         LASSERT(env);
671         LASSERT(los->los_dev);
672         ls = dt2ls_dev(los->los_dev);
673
674         cfs_mutex_lock(&ls->ls_los_mutex);
675         if (cfs_atomic_read(&los->los_refcount) == 0) {
676                 if (los->los_obj)
677                         lu_object_put_nocache(env, &los->los_obj->do_lu);
678                 cfs_list_del(&los->los_list);
679                 OBD_FREE_PTR(los);
680         }
681         cfs_mutex_unlock(&ls->ls_los_mutex);
682         ls_device_put(env, ls);
683 }
684 EXPORT_SYMBOL(local_oid_storage_fini);
685