Whamcloud - gitweb
LU-1302 llog: simplify llog_setup functionality
[fs/lustre-release.git] / lustre / obdclass / local_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012 Whamcloud, Inc.
24  */
25 /*
26  * lustre/obdclass/local_storage.c
27  *
28  * Local storage for file/objects with fid generation. Works on top of OSD.
29  *
30  * Author: Mikhail Pershin <mike.pershin@intel.com>
31  */
32
33 #define DEBUG_SUBSYSTEM S_CLASS
34
35 #include "local_storage.h"
36
37 /* all initialized local storages on this node are linked on this */
38 static CFS_LIST_HEAD(ls_list_head);
39 static CFS_DEFINE_MUTEX(ls_list_mutex);
40
41 static int ls_object_init(const struct lu_env *env, struct lu_object *o,
42                           const struct lu_object_conf *unused)
43 {
44         struct ls_device        *ls;
45         struct lu_object        *below;
46         struct lu_device        *under;
47
48         ENTRY;
49
50         ls = container_of0(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev);
51         under = &ls->ls_osd->dd_lu_dev;
52         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
53         if (below == NULL)
54                 RETURN(-ENOMEM);
55
56         lu_object_add(o, below);
57
58         RETURN(0);
59 }
60
61 static void ls_object_free(const struct lu_env *env, struct lu_object *o)
62 {
63         struct ls_object        *obj = lu2ls_obj(o);
64         struct lu_object_header *h = o->lo_header;
65
66         dt_object_fini(&obj->ls_obj);
67         lu_object_header_fini(h);
68         OBD_FREE_PTR(obj);
69 }
70
71 struct lu_object_operations ls_lu_obj_ops = {
72         .loo_object_init  = ls_object_init,
73         .loo_object_free  = ls_object_free,
74 };
75
76 struct lu_object *ls_object_alloc(const struct lu_env *env,
77                                   const struct lu_object_header *_h,
78                                   struct lu_device *d)
79 {
80         struct lu_object_header *h;
81         struct ls_object        *o;
82         struct lu_object        *l;
83
84         LASSERT(_h == NULL);
85
86         OBD_ALLOC_PTR(o);
87         if (o != NULL) {
88                 l = &o->ls_obj.do_lu;
89                 h = &o->ls_header;
90
91                 lu_object_header_init(h);
92                 dt_object_init(&o->ls_obj, h, d);
93                 lu_object_add_top(h, l);
94
95                 l->lo_ops = &ls_lu_obj_ops;
96
97                 return l;
98         } else {
99                 return NULL;
100         }
101 }
102
103 static struct lu_device_operations ls_lu_dev_ops = {
104         .ldo_object_alloc =     ls_object_alloc
105 };
106
107 static struct ls_device *__ls_find_dev(struct dt_device *dev)
108 {
109         struct ls_device *ls, *ret = NULL;
110
111         cfs_list_for_each_entry(ls, &ls_list_head, ls_linkage) {
112                 if (ls->ls_osd == dev) {
113                         cfs_atomic_inc(&ls->ls_refcount);
114                         ret = ls;
115                         break;
116                 }
117         }
118         return ret;
119 }
120
121 struct ls_device *ls_find_dev(struct dt_device *dev)
122 {
123         struct ls_device *ls;
124
125         cfs_mutex_lock(&ls_list_mutex);
126         ls = __ls_find_dev(dev);
127         cfs_mutex_unlock(&ls_list_mutex);
128
129         return ls;
130 }
131
132 static struct lu_device_type_operations ls_device_type_ops = {
133         .ldto_start = NULL,
134         .ldto_stop  = NULL,
135 };
136
137 static struct lu_device_type ls_lu_type = {
138         .ldt_name = "local_storage",
139         .ldt_ops  = &ls_device_type_ops,
140 };
141
142 static struct ls_device *ls_device_get(const struct lu_env *env,
143                                        struct dt_device *dev)
144 {
145         struct ls_device *ls;
146
147         ENTRY;
148
149         cfs_mutex_lock(&ls_list_mutex);
150         ls = __ls_find_dev(dev);
151         if (ls)
152                 GOTO(out_ls, ls);
153
154         /* not found, then create */
155         OBD_ALLOC_PTR(ls);
156         if (ls == NULL)
157                 GOTO(out_ls, ls = ERR_PTR(-ENOMEM));
158
159         cfs_atomic_set(&ls->ls_refcount, 1);
160         CFS_INIT_LIST_HEAD(&ls->ls_los_list);
161         cfs_mutex_init(&ls->ls_los_mutex);
162
163         ls->ls_osd = dev;
164
165         LASSERT(dev->dd_lu_dev.ld_site);
166         lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type);
167         ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops;
168         ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site;
169
170         /* finally add ls to the list */
171         cfs_list_add(&ls->ls_linkage, &ls_list_head);
172 out_ls:
173         cfs_mutex_unlock(&ls_list_mutex);
174         RETURN(ls);
175 }
176
177 static void ls_device_put(const struct lu_env *env, struct ls_device *ls)
178 {
179         LASSERT(env);
180         if (!cfs_atomic_dec_and_test(&ls->ls_refcount))
181                 return;
182
183         cfs_mutex_lock(&ls_list_mutex);
184         if (cfs_atomic_read(&ls->ls_refcount) == 0) {
185                 LASSERT(cfs_list_empty(&ls->ls_los_list));
186                 cfs_list_del(&ls->ls_linkage);
187                 lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0);
188                 lu_device_fini(&ls->ls_top_dev.dd_lu_dev);
189                 OBD_FREE_PTR(ls);
190         }
191         cfs_mutex_unlock(&ls_list_mutex);
192 }
193
194 /**
195  * local file fid generation
196  */
197 int local_object_fid_generate(const struct lu_env *env,
198                               struct local_oid_storage *los,
199                               struct lu_fid *fid)
200 {
201         LASSERT(los->los_dev);
202         LASSERT(los->los_obj);
203
204         /* take next OID */
205
206         /* to make it unique after reboot we store
207          * the latest generated fid atomically with
208          * object creation see local_object_create() */
209
210         cfs_mutex_lock(&los->los_id_lock);
211         fid->f_seq = los->los_seq;
212         fid->f_oid = los->los_last_oid++;
213         fid->f_ver = 0;
214         cfs_mutex_unlock(&los->los_id_lock);
215
216         return 0;
217 }
218
219 int local_object_declare_create(const struct lu_env *env,
220                                 struct local_oid_storage *los,
221                                 struct dt_object *o, struct lu_attr *attr,
222                                 struct dt_object_format *dof,
223                                 struct thandle *th)
224 {
225         struct dt_thread_info   *dti = dt_info(env);
226         int                      rc;
227
228         ENTRY;
229
230         /* update fid generation file */
231         if (los != NULL) {
232                 LASSERT(dt_object_exists(los->los_obj));
233                 rc = dt_declare_record_write(env, los->los_obj,
234                                              sizeof(struct los_ondisk), 0, th);
235                 if (rc)
236                         RETURN(rc);
237         }
238
239         rc = dt_declare_create(env, o, attr, NULL, dof, th);
240         if (rc)
241                 RETURN(rc);
242
243         dti->dti_lb.lb_buf = NULL;
244         dti->dti_lb.lb_len = sizeof(dti->dti_lma);
245         rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th);
246
247         RETURN(rc);
248 }
249
250 int local_object_create(const struct lu_env *env,
251                         struct local_oid_storage *los,
252                         struct dt_object *o, struct lu_attr *attr,
253                         struct dt_object_format *dof, struct thandle *th)
254 {
255         struct dt_thread_info   *dti = dt_info(env);
256         struct los_ondisk        losd;
257         int                      rc;
258
259         ENTRY;
260
261         rc = dt_create(env, o, attr, NULL, dof, th);
262         if (rc)
263                 RETURN(rc);
264
265         lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
266         lustre_lma_swab(&dti->dti_lma);
267         dti->dti_lb.lb_buf = &dti->dti_lma;
268         dti->dti_lb.lb_len = sizeof(dti->dti_lma);
269         rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0, th,
270                           BYPASS_CAPA);
271
272         if (los == NULL)
273                 RETURN(rc);
274
275         LASSERT(los->los_obj);
276         LASSERT(dt_object_exists(los->los_obj));
277
278         /* many threads can be updated this, serialize
279          * them here to avoid the race where one thread
280          * takes the value first, but writes it last */
281         cfs_mutex_lock(&los->los_id_lock);
282
283         /* update local oid number on disk so that
284          * we know the last one used after reboot */
285         losd.lso_magic = cpu_to_le32(LOS_MAGIC);
286         losd.lso_next_oid = cpu_to_le32(los->los_last_oid);
287
288         dti->dti_off = 0;
289         dti->dti_lb.lb_buf = &losd;
290         dti->dti_lb.lb_len = sizeof(losd);
291         rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
292                              th);
293         cfs_mutex_unlock(&los->los_id_lock);
294
295         RETURN(rc);
296 }
297
298 /*
299  * Create local named object (file, directory or index) in parent directory.
300  */
301 struct dt_object *__local_file_create(const struct lu_env *env,
302                                       const struct lu_fid *fid,
303                                       struct local_oid_storage *los,
304                                       struct ls_device *ls,
305                                       struct dt_object *parent,
306                                       const char *name, struct lu_attr *attr,
307                                       struct dt_object_format *dof)
308 {
309         struct dt_thread_info   *dti = dt_info(env);
310         struct dt_object        *dto;
311         struct thandle          *th;
312         int                      rc;
313
314         dto = ls_locate(env, ls, fid);
315         if (unlikely(IS_ERR(dto)))
316                 RETURN(dto);
317
318         LASSERT(dto != NULL);
319         if (dt_object_exists(dto))
320                 GOTO(out, rc = -EEXIST);
321
322         th = dt_trans_create(env, ls->ls_osd);
323         if (IS_ERR(th))
324                 GOTO(out, rc = PTR_ERR(th));
325
326         rc = local_object_declare_create(env, los, dto, attr, dof, th);
327         if (rc)
328                 GOTO(trans_stop, rc);
329
330         if (dti->dti_dof.dof_type == DFT_DIR) {
331                 dt_declare_ref_add(env, dto, th);
332                 dt_declare_ref_add(env, parent, th);
333         }
334
335         rc = dt_declare_insert(env, parent, (void *)fid, (void *)name, th);
336         if (rc)
337                 GOTO(trans_stop, rc);
338
339         rc = dt_trans_start_local(env, ls->ls_osd, th);
340         if (rc)
341                 GOTO(trans_stop, rc);
342
343         dt_write_lock(env, dto, 0);
344         if (dt_object_exists(dto))
345                 GOTO(unlock, rc = 0);
346
347         CDEBUG(D_OTHER, "create new object "DFID"\n",
348                PFID(lu_object_fid(&dto->do_lu)));
349         rc = local_object_create(env, los, dto, attr, dof, th);
350         if (rc)
351                 GOTO(unlock, rc);
352         LASSERT(dt_object_exists(dto));
353
354         if (dti->dti_dof.dof_type == DFT_DIR) {
355                 if (!dt_try_as_dir(env, dto))
356                         GOTO(destroy, rc = -ENOTDIR);
357                 /* Add "." and ".." for newly created dir */
358                 rc = dt_insert(env, dto, (void *)fid, (void *)".", th,
359                                BYPASS_CAPA, 1);
360                 if (rc)
361                         GOTO(destroy, rc);
362                 dt_ref_add(env, dto, th);
363                 rc = dt_insert(env, dto, (void *)lu_object_fid(&parent->do_lu),
364                                (void *)"..", th, BYPASS_CAPA, 1);
365                 if (rc)
366                         GOTO(destroy, rc);
367         }
368
369         dt_write_lock(env, parent, 0);
370         rc = dt_insert(env, parent, (const struct dt_rec *)fid,
371                        (const struct dt_key *)name, th, BYPASS_CAPA, 1);
372         if (dti->dti_dof.dof_type == DFT_DIR)
373                 dt_ref_add(env, parent, th);
374         dt_write_unlock(env, parent);
375         if (rc)
376                 GOTO(destroy, rc);
377 destroy:
378         if (rc)
379                 dt_destroy(env, dto, th);
380 unlock:
381         dt_write_unlock(env, dto);
382 trans_stop:
383         dt_trans_stop(env, ls->ls_osd, th);
384 out:
385         if (rc) {
386                 lu_object_put_nocache(env, &dto->do_lu);
387                 dto = ERR_PTR(rc);
388         } else {
389                 struct lu_fid dti_fid;
390                 /* since local files FIDs are not in OI the directory entry
391                  * is used to get inode number/generation, we need to do lookup
392                  * again to cache this data after create */
393                 rc = dt_lookup_dir(env, parent, name, &dti_fid);
394                 LASSERT(rc == 0);
395         }
396         RETURN(dto);
397 }
398
399 /*
400  * Look up and create (if it does not exist) a local named file or directory in
401  * parent directory.
402  */
403 struct dt_object *local_file_find_or_create(const struct lu_env *env,
404                                             struct local_oid_storage *los,
405                                             struct dt_object *parent,
406                                             const char *name, __u32 mode)
407 {
408         struct dt_thread_info   *dti = dt_info(env);
409         struct dt_object        *dto;
410         int                      rc;
411
412         LASSERT(parent);
413
414         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
415         if (rc == 0)
416                 /* name is found, get the object */
417                 dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
418         else if (rc != -ENOENT)
419                 dto = ERR_PTR(rc);
420         else {
421                 rc = local_object_fid_generate(env, los, &dti->dti_fid);
422                 if (rc < 0) {
423                         dto = ERR_PTR(rc);
424                 } else {
425                         /* create the object */
426                         dti->dti_attr.la_valid  = LA_MODE;
427                         dti->dti_attr.la_mode   = mode;
428                         dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
429                         dto = __local_file_create(env, &dti->dti_fid, los,
430                                                   dt2ls_dev(los->los_dev),
431                                                   parent, name, &dti->dti_attr,
432                                                   &dti->dti_dof);
433                 }
434         }
435         return dto;
436 }
437 EXPORT_SYMBOL(local_file_find_or_create);
438
439 struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
440                                                      struct dt_device *dt,
441                                                      const struct lu_fid *fid,
442                                                      struct dt_object *parent,
443                                                      const char *name,
444                                                      __u32 mode)
445 {
446         struct dt_thread_info   *dti = dt_info(env);
447         struct dt_object        *dto;
448         int                      rc;
449
450         LASSERT(parent);
451
452         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
453         if (rc == 0) {
454                 /* name is found, get the object */
455                 if (!lu_fid_eq(fid, &dti->dti_fid))
456                         dto = ERR_PTR(-EINVAL);
457                 else
458                         dto = dt_locate(env, dt, fid);
459         } else if (rc != -ENOENT) {
460                 dto = ERR_PTR(rc);
461         } else {
462                 struct ls_device *ls;
463
464                 ls = ls_device_get(env, dt);
465                 if (IS_ERR(ls)) {
466                         dto = ERR_PTR(PTR_ERR(ls));
467                 } else {
468                         /* create the object */
469                         dti->dti_attr.la_valid  = LA_MODE;
470                         dti->dti_attr.la_mode   = mode;
471                         dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
472                         dto = __local_file_create(env, fid, NULL, ls, parent,
473                                                   name, &dti->dti_attr,
474                                                   &dti->dti_dof);
475                         /* ls_device_put() will finalize the ls device, we
476                          * have to open the object in other device stack */
477                         if (!IS_ERR(dto)) {
478                                 dti->dti_fid = dto->do_lu.lo_header->loh_fid;
479                                 lu_object_put_nocache(env, &dto->do_lu);
480                                 dto = dt_locate(env, dt, &dti->dti_fid);
481                         }
482                         ls_device_put(env, ls);
483                 }
484         }
485         return dto;
486 }
487 EXPORT_SYMBOL(local_file_find_or_create_with_fid);
488
489 /*
490  * Look up and create (if it does not exist) a local named index file in parent
491  * directory.
492  */
493 struct dt_object *local_index_find_or_create(const struct lu_env *env,
494                                              struct local_oid_storage *los,
495                                              struct dt_object *parent,
496                                              const char *name, __u32 mode,
497                                              const struct dt_index_features *ft)
498 {
499         struct dt_thread_info   *dti = dt_info(env);
500         struct dt_object        *dto;
501         int                      rc;
502
503         LASSERT(parent);
504
505         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
506         if (rc == 0) {
507                 /* name is found, get the object */
508                 dto = ls_locate(env, dt2ls_dev(los->los_dev), &dti->dti_fid);
509         } else if (rc != -ENOENT) {
510                 dto = ERR_PTR(rc);
511         } else {
512                 rc = local_object_fid_generate(env, los, &dti->dti_fid);
513                 if (rc < 0) {
514                         dto = ERR_PTR(rc);
515                 } else {
516                         /* create the object */
517                         dti->dti_attr.la_valid          = LA_MODE;
518                         dti->dti_attr.la_mode           = mode;
519                         dti->dti_dof.dof_type           = DFT_INDEX;
520                         dti->dti_dof.u.dof_idx.di_feat  = ft;
521                         dto = __local_file_create(env, &dti->dti_fid, los,
522                                                   dt2ls_dev(los->los_dev),
523                                                   parent, name, &dti->dti_attr,
524                                                   &dti->dti_dof);
525                 }
526         }
527         return dto;
528
529 }
530 EXPORT_SYMBOL(local_index_find_or_create);
531
532 struct dt_object *
533 local_index_find_or_create_with_fid(const struct lu_env *env,
534                                     struct dt_device *dt,
535                                     const struct lu_fid *fid,
536                                     struct dt_object *parent,
537                                     const char *name, __u32 mode,
538                                     const struct dt_index_features *ft)
539 {
540         struct dt_thread_info   *dti = dt_info(env);
541         struct dt_object        *dto;
542         int                      rc;
543
544         LASSERT(parent);
545
546         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
547         if (rc == 0) {
548                 /* name is found, get the object */
549                 if (!lu_fid_eq(fid, &dti->dti_fid))
550                         dto = ERR_PTR(-EINVAL);
551                 else
552                         dto = dt_locate(env, dt, fid);
553         } else if (rc != -ENOENT) {
554                 dto = ERR_PTR(rc);
555         } else {
556                 struct ls_device *ls;
557
558                 ls = ls_device_get(env, dt);
559                 if (IS_ERR(ls)) {
560                         dto = ERR_PTR(PTR_ERR(ls));
561                 } else {
562                         /* create the object */
563                         dti->dti_attr.la_valid          = LA_MODE;
564                         dti->dti_attr.la_mode           = mode;
565                         dti->dti_dof.dof_type           = DFT_INDEX;
566                         dti->dti_dof.u.dof_idx.di_feat  = ft;
567                         dto = __local_file_create(env, fid, NULL, ls, parent,
568                                                   name, &dti->dti_attr,
569                                                   &dti->dti_dof);
570                         /* ls_device_put() will finalize the ls device, we
571                          * have to open the object in other device stack */
572                         if (!IS_ERR(dto)) {
573                                 dti->dti_fid = dto->do_lu.lo_header->loh_fid;
574                                 lu_object_put_nocache(env, &dto->do_lu);
575                                 dto = dt_locate(env, dt, &dti->dti_fid);
576                         }
577                         ls_device_put(env, ls);
578                 }
579         }
580         return dto;
581 }
582 EXPORT_SYMBOL(local_index_find_or_create_with_fid);
583
584 static struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
585 {
586         struct local_oid_storage *los, *ret = NULL;
587
588         cfs_list_for_each_entry(los, &ls->ls_los_list, los_list) {
589                 if (los->los_seq == seq) {
590                         cfs_atomic_inc(&los->los_refcount);
591                         ret = los;
592                         break;
593                 }
594         }
595         return ret;
596 }
597
598 /**
599  * Initialize local OID storage for required sequence.
600  * That may be needed for services that uses local files and requires
601  * dynamic OID allocation for them.
602  *
603  * Per each sequence we have an object with 'first_fid' identificator
604  * containing the counter for OIDs of locally created files with that
605  * sequence.
606  *
607  * It is used now by llog subsystem and MGS for NID tables
608  *
609  * Function gets first_fid to create counter object.
610  * All dynamic fids will be generated with the same sequence and incremented
611  * OIDs
612  *
613  * Returned local_oid_storage is in-memory representaion of OID storage
614  */
615 int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
616                            const struct lu_fid *first_fid,
617                            struct local_oid_storage **los)
618 {
619         struct dt_thread_info   *dti = dt_info(env);
620         struct ls_device        *ls;
621         struct los_ondisk        losd;
622         struct dt_object        *o;
623         struct dt_object        *root = NULL;
624         struct thandle          *th;
625         int                      rc;
626
627         ENTRY;
628
629         ls = ls_device_get(env, dev);
630         if (IS_ERR(ls))
631                 RETURN(PTR_ERR(ls));
632
633         cfs_mutex_lock(&ls->ls_los_mutex);
634         *los = dt_los_find(ls, fid_seq(first_fid));
635         if (*los != NULL)
636                 GOTO(out, rc = 0);
637
638         /* not found, then create */
639         OBD_ALLOC_PTR(*los);
640         if (*los == NULL)
641                 GOTO(out, rc = -ENOMEM);
642
643         cfs_atomic_set(&(*los)->los_refcount, 1);
644         cfs_mutex_init(&(*los)->los_id_lock);
645         (*los)->los_dev = &ls->ls_top_dev;
646         cfs_atomic_inc(&ls->ls_refcount);
647         cfs_list_add(&(*los)->los_list, &ls->ls_los_list);
648
649         /* initialize data allowing to generate new fids,
650          * literally we need a sequence */
651         o = ls_locate(env, ls, first_fid);
652         if (IS_ERR(o))
653                 GOTO(out_los, rc = PTR_ERR(o));
654
655         rc = dt_root_get(env, dev, &dti->dti_fid);
656         if (rc)
657                 GOTO(out_los, rc);
658
659         root = ls_locate(env, ls, &dti->dti_fid);
660         if (IS_ERR(root))
661                 GOTO(out_los, rc = PTR_ERR(root));
662
663         if (dt_try_as_dir(env, root) == 0)
664                 GOTO(out_los, rc = -ENOTDIR);
665
666         dt_write_lock(env, o, 0);
667         if (!dt_object_exists(o)) {
668                 th = dt_trans_create(env, dev);
669                 if (IS_ERR(th))
670                         GOTO(out_lock, rc = PTR_ERR(th));
671
672                 dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
673                 dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
674                 dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG);
675
676                 rc = dt_declare_create(env, o, &dti->dti_attr, NULL,
677                                        &dti->dti_dof, th);
678                 if (rc)
679                         GOTO(out_trans, rc);
680
681                 snprintf(dti->dti_buf, sizeof(dti->dti_buf),
682                         "seq-%Lx-lastid", fid_seq(first_fid));
683                 rc = dt_declare_insert(env, root,
684                                        (const struct dt_rec *)lu_object_fid(&o->do_lu),
685                                        (const struct dt_key *)dti->dti_buf,
686                                        th);
687                 if (rc)
688                         GOTO(out_trans, rc);
689
690                 dti->dti_lb.lb_buf = NULL;
691                 dti->dti_lb.lb_len = sizeof(dti->dti_lma);
692                 rc = dt_declare_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA,
693                                           0, th);
694                 if (rc)
695                         GOTO(out_trans, rc);
696
697                 rc = dt_declare_record_write(env, o, sizeof(losd), 0, th);
698                 if (rc)
699                         GOTO(out_trans, rc);
700
701                 rc = dt_trans_start_local(env, dev, th);
702                 if (rc)
703                         GOTO(out_trans, rc);
704
705                 LASSERT(!dt_object_exists(o));
706                 rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof, th);
707                 if (rc)
708                         GOTO(out_trans, rc);
709                 LASSERT(dt_object_exists(o));
710
711                 lustre_lma_init(&dti->dti_lma, lu_object_fid(&o->do_lu));
712                 lustre_lma_swab(&dti->dti_lma);
713                 dti->dti_lb.lb_buf = &dti->dti_lma;
714                 dti->dti_lb.lb_len = sizeof(dti->dti_lma);
715                 rc = dt_xattr_set(env, o, &dti->dti_lb, XATTR_NAME_LMA, 0,
716                                   th, BYPASS_CAPA);
717                 if (rc)
718                         GOTO(out_trans, rc);
719
720                 losd.lso_magic = cpu_to_le32(LOS_MAGIC);
721                 losd.lso_next_oid = cpu_to_le32(fid_oid(first_fid) + 1);
722
723                 dti->dti_off = 0;
724                 dti->dti_lb.lb_buf = &losd;
725                 dti->dti_lb.lb_len = sizeof(losd);
726                 rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
727                 if (rc)
728                         GOTO(out_trans, rc);
729                 rc = dt_insert(env, root,
730                                (const struct dt_rec *)lu_object_fid(&o->do_lu),
731                                (const struct dt_key *)dti->dti_buf, th,
732                                BYPASS_CAPA, 1);
733                 if (rc)
734                         GOTO(out_trans, rc);
735 out_trans:
736                 dt_trans_stop(env, dev, th);
737         } else {
738                 dti->dti_off = 0;
739                 dti->dti_lb.lb_buf = &losd;
740                 dti->dti_lb.lb_len = sizeof(losd);
741                 rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
742                 if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
743                         CERROR("local storage file "DFID" is corrupted\n",
744                                PFID(first_fid));
745                         rc = -EINVAL;
746                 }
747         }
748 out_lock:
749         dt_write_unlock(env, o);
750 out_los:
751         if (root)
752                 lu_object_put_nocache(env, &root->do_lu);
753         if (rc) {
754                 OBD_FREE_PTR(*los);
755                 *los = NULL;
756                 if (o)
757                         lu_object_put_nocache(env, &o->do_lu);
758         } else {
759                 (*los)->los_seq = fid_seq(first_fid);
760                 (*los)->los_last_oid = le32_to_cpu(losd.lso_next_oid);
761                 (*los)->los_obj = o;
762         }
763 out:
764         cfs_mutex_unlock(&ls->ls_los_mutex);
765         ls_device_put(env, ls);
766         return rc;
767 }
768 EXPORT_SYMBOL(local_oid_storage_init);
769
770 void local_oid_storage_fini(const struct lu_env *env,
771                             struct local_oid_storage *los)
772 {
773         struct ls_device *ls;
774
775         if (!cfs_atomic_dec_and_test(&los->los_refcount))
776                 return;
777
778         LASSERT(env);
779         LASSERT(los->los_dev);
780         ls = dt2ls_dev(los->los_dev);
781
782         cfs_mutex_lock(&ls->ls_los_mutex);
783         if (cfs_atomic_read(&los->los_refcount) == 0) {
784                 if (los->los_obj)
785                         lu_object_put_nocache(env, &los->los_obj->do_lu);
786                 cfs_list_del(&los->los_list);
787                 OBD_FREE_PTR(los);
788         }
789         cfs_mutex_unlock(&ls->ls_los_mutex);
790         ls_device_put(env, ls);
791 }
792 EXPORT_SYMBOL(local_oid_storage_fini);