Whamcloud - gitweb
LU-17000 utils: handle_yaml_no_op() has wrong signature
[fs/lustre-release.git] / lustre / obdclass / local_storage.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2012, 2017, Intel Corporation.
5  */
6
7 /*
8  * This file is part of Lustre, http://www.lustre.org/
9  *
10  * Local storage for file/objects with fid generation. Works on top of OSD.
11  *
12  * Author: Mikhail Pershin <mike.pershin@intel.com>
13  */
14
15 #define DEBUG_SUBSYSTEM S_CLASS
16
17 #include "local_storage.h"
18
19 /* all initialized local storages on this node are linked on this */
20 static LIST_HEAD(ls_list_head);
21 static DEFINE_MUTEX(ls_list_mutex);
22
23 static int ls_object_init(const struct lu_env *env, struct lu_object *o,
24                           const struct lu_object_conf *unused)
25 {
26         struct ls_device        *ls;
27         struct lu_object        *below;
28         struct lu_device        *under;
29
30         ENTRY;
31
32         ls = container_of(o->lo_dev, struct ls_device, ls_top_dev.dd_lu_dev);
33         under = &ls->ls_osd->dd_lu_dev;
34         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
35         if (below == NULL)
36                 RETURN(-ENOMEM);
37
38         lu_object_add(o, below);
39
40         RETURN(0);
41 }
42
43 static void ls_object_free(const struct lu_env *env, struct lu_object *o)
44 {
45         struct ls_object        *obj = lu2ls_obj(o);
46         struct lu_object_header *h = o->lo_header;
47
48         dt_object_fini(&obj->ls_obj);
49         lu_object_header_fini(h);
50         OBD_FREE_RCU(obj, sizeof(*obj), ls_header.loh_rcu);
51 }
52
53 static const struct lu_object_operations ls_lu_obj_ops = {
54         .loo_object_init  = ls_object_init,
55         .loo_object_free  = ls_object_free,
56 };
57
58 static struct lu_object *ls_object_alloc(const struct lu_env *env,
59                                          const struct lu_object_header *_h,
60                                          struct lu_device *d)
61 {
62         struct lu_object_header *h;
63         struct ls_object        *o;
64         struct lu_object        *l;
65
66         LASSERT(_h == NULL);
67
68         OBD_ALLOC_PTR(o);
69         if (o != NULL) {
70                 l = &o->ls_obj.do_lu;
71                 h = &o->ls_header;
72
73                 lu_object_header_init(h);
74                 dt_object_init(&o->ls_obj, h, d);
75                 lu_object_add_top(h, l);
76
77                 l->lo_ops = &ls_lu_obj_ops;
78
79                 return l;
80         } else {
81                 return NULL;
82         }
83 }
84
85 static const struct lu_device_operations ls_lu_dev_ops = {
86         .ldo_object_alloc =     ls_object_alloc
87 };
88
89 static const struct lu_device_type_operations ls_device_type_ops = {
90         .ldto_start = NULL,
91         .ldto_stop  = NULL,
92 };
93
94 static struct lu_device_type ls_lu_type = {
95         .ldt_name = "local_storage",
96         .ldt_ops  = &ls_device_type_ops,
97 };
98
99 static
100 struct ls_device *ls_device_init(struct dt_device *dev)
101 {
102         struct ls_device *ls;
103
104         ENTRY;
105
106         OBD_ALLOC_PTR(ls);
107         if (ls == NULL)
108                 GOTO(out_ls, ls = ERR_PTR(-ENOMEM));
109
110         kref_init(&ls->ls_refcount);
111         INIT_LIST_HEAD(&ls->ls_los_list);
112         mutex_init(&ls->ls_los_mutex);
113
114         ls->ls_osd = dev;
115
116         LASSERT(dev->dd_lu_dev.ld_site);
117         lu_device_init(&ls->ls_top_dev.dd_lu_dev, &ls_lu_type);
118         ls->ls_top_dev.dd_lu_dev.ld_ops = &ls_lu_dev_ops;
119         ls->ls_top_dev.dd_lu_dev.ld_site = dev->dd_lu_dev.ld_site;
120
121         /* finally add ls to the list */
122         list_add(&ls->ls_linkage, &ls_list_head);
123 out_ls:
124         RETURN(ls);
125 }
126
127 struct ls_device *ls_device_find_or_init(struct dt_device *dev)
128 {
129         struct ls_device *ls, *ret = NULL;
130
131         ENTRY;
132
133         mutex_lock(&ls_list_mutex);
134         /* find */
135         list_for_each_entry(ls, &ls_list_head, ls_linkage) {
136                 if (ls->ls_osd == dev) {
137                         kref_get(&ls->ls_refcount);
138                         ret = ls;
139                         break;
140                 }
141         }
142         /* found */
143         if (ret)
144                 GOTO(out_ls, ret);
145
146         /* not found, then create */
147         ls = ls_device_init(dev);
148 out_ls:
149         mutex_unlock(&ls_list_mutex);
150         RETURN(ls);
151 }
152
153 static void ls_device_put_free(struct kref *kref)
154 {
155         struct ls_device *ls = container_of(kref, struct ls_device,
156                                             ls_refcount);
157
158         LASSERT(list_empty(&ls->ls_los_list));
159         list_del(&ls->ls_linkage);
160         mutex_unlock(&ls_list_mutex);
161 }
162
163 void ls_device_put(const struct lu_env *env, struct ls_device *ls)
164 {
165         LASSERT(env);
166         if (kref_put_mutex(&ls->ls_refcount, ls_device_put_free,
167                            &ls_list_mutex)) {
168                 lu_site_purge(env, ls->ls_top_dev.dd_lu_dev.ld_site, ~0);
169                 lu_device_fini(&ls->ls_top_dev.dd_lu_dev);
170                 OBD_FREE_PTR(ls);
171         }
172 }
173
174 /**
175  * local file fid generation
176  */
177 int local_object_fid_generate(const struct lu_env *env,
178                               struct local_oid_storage *los,
179                               struct lu_fid *fid)
180 {
181         LASSERT(los->los_dev);
182         LASSERT(los->los_obj);
183
184         /* take next OID */
185
186         /* to make it unique after reboot we store
187          * the latest generated fid atomically with
188          * object creation see local_object_create() */
189
190         mutex_lock(&los->los_id_lock);
191         fid->f_seq = los->los_seq;
192         fid->f_oid = ++los->los_last_oid;
193         fid->f_ver = 0;
194         mutex_unlock(&los->los_id_lock);
195
196         return 0;
197 }
198
199 int local_object_declare_create(const struct lu_env *env,
200                                 struct local_oid_storage *los,
201                                 struct dt_object *o, struct lu_attr *attr,
202                                 struct dt_object_format *dof,
203                                 struct thandle *th)
204 {
205         struct dt_thread_info   *dti = dt_info(env);
206         int                      rc;
207
208         ENTRY;
209
210         /* update fid generation file */
211         if (los != NULL) {
212                 LASSERT(dt_object_exists(los->los_obj));
213                 dti->dti_lb.lb_buf = NULL;
214                 dti->dti_lb.lb_len = sizeof(struct los_ondisk);
215                 rc = dt_declare_record_write(env, los->los_obj,
216                                              &dti->dti_lb, 0, th);
217                 if (rc)
218                         RETURN(rc);
219         }
220
221         rc = dt_declare_create(env, o, attr, NULL, dof, th);
222         if (rc)
223                 RETURN(rc);
224
225         dti->dti_lb.lb_buf = NULL;
226         dti->dti_lb.lb_len = sizeof(dti->dti_lma);
227         rc = dt_declare_xattr_set(env, o, NULL, &dti->dti_lb, XATTR_NAME_LMA, 0,
228                                   th);
229
230         RETURN(rc);
231 }
232
233 int local_object_create(const struct lu_env *env,
234                         struct local_oid_storage *los,
235                         struct dt_object *o, struct lu_attr *attr,
236                         struct dt_object_format *dof, struct thandle *th)
237 {
238         struct dt_thread_info   *dti = dt_info(env);
239         u64                      lastid;
240         int                      rc;
241
242         ENTRY;
243
244         rc = dt_create(env, o, attr, NULL, dof, th);
245         if (rc)
246                 RETURN(rc);
247
248         if (los == NULL)
249                 RETURN(rc);
250
251         LASSERT(los->los_obj);
252         LASSERT(dt_object_exists(los->los_obj));
253
254         /* many threads can be updated this, serialize
255          * them here to avoid the race where one thread
256          * takes the value first, but writes it last */
257         mutex_lock(&los->los_id_lock);
258
259         /* update local oid number on disk so that
260          * we know the last one used after reboot */
261         lastid = cpu_to_le64(los->los_last_oid);
262
263         dti->dti_off = 0;
264         dti->dti_lb.lb_buf = &lastid;
265         dti->dti_lb.lb_len = sizeof(lastid);
266         rc = dt_record_write(env, los->los_obj, &dti->dti_lb, &dti->dti_off,
267                              th);
268         mutex_unlock(&los->los_id_lock);
269
270         RETURN(rc);
271 }
272
273 /*
274  * Create local named object (file, directory or index) in parent directory.
275  */
276 static struct dt_object *__local_file_create(const struct lu_env *env,
277                                              const struct lu_fid *fid,
278                                              struct local_oid_storage *los,
279                                              struct ls_device *ls,
280                                              struct dt_object *parent,
281                                              const char *name,
282                                              struct lu_attr *attr,
283                                              struct dt_object_format *dof)
284 {
285         struct dt_thread_info   *dti    = dt_info(env);
286         struct lu_object_conf   *conf   = &dti->dti_conf;
287         struct dt_insert_rec    *rec    = &dti->dti_dt_rec;
288         struct dt_object        *dto;
289         struct thandle          *th;
290         int                      rc;
291
292         /* We know that the target object does not exist, to be created,
293          * then give some hints - LOC_F_NEW to help low layer to handle
294          * that efficiently and properly. */
295         memset(conf, 0, sizeof(*conf));
296         conf->loc_flags = LOC_F_NEW;
297         dto = ls_locate(env, ls, fid, conf);
298         if (unlikely(IS_ERR(dto)))
299                 RETURN(dto);
300
301         LASSERT(dto != NULL);
302         if (dt_object_exists(dto))
303                 GOTO(out, rc = -EEXIST);
304
305         th = dt_trans_create(env, ls->ls_osd);
306         if (IS_ERR(th))
307                 GOTO(out, rc = PTR_ERR(th));
308
309         rc = local_object_declare_create(env, los, dto, attr, dof, th);
310         if (rc)
311                 GOTO(trans_stop, rc);
312
313         if (dti->dti_dof.dof_type == DFT_DIR) {
314                 rc = dt_declare_ref_add(env, dto, th);
315                 if (rc < 0)
316                         GOTO(trans_stop, rc);
317
318                 rc = dt_declare_ref_add(env, parent, th);
319                 if (rc < 0)
320                         GOTO(trans_stop, rc);
321         }
322
323         rec->rec_fid = fid;
324         rec->rec_type = attr->la_mode & S_IFMT;
325         rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
326                                (const struct dt_key *)name, th);
327         if (rc)
328                 GOTO(trans_stop, rc);
329
330         if (dti->dti_dof.dof_type == DFT_DIR) {
331                 if (!dt_try_as_dir(env, dto, false))
332                         GOTO(trans_stop, rc = -ENOTDIR);
333
334                 rec->rec_type = S_IFDIR;
335                 rec->rec_fid = fid;
336                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
337                                 (const struct dt_key *)".", th);
338                 if (rc != 0)
339                         GOTO(trans_stop, rc);
340
341                 rec->rec_fid = lu_object_fid(&parent->do_lu);
342                 rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
343                                 (const struct dt_key *)"..", th);
344                 if (rc != 0)
345                         GOTO(trans_stop, rc);
346
347                 rc = dt_declare_ref_add(env, dto, th);
348                 if (rc != 0)
349                         GOTO(trans_stop, rc);
350         }
351
352         rc = dt_trans_start_local(env, ls->ls_osd, th);
353         if (rc)
354                 GOTO(trans_stop, rc);
355
356         dt_write_lock(env, dto, DT_SRC_CHILD);
357         if (dt_object_exists(dto))
358                 GOTO(unlock, rc = 0);
359
360         CDEBUG(D_OTHER, "create new object "DFID"\n",
361                PFID(lu_object_fid(&dto->do_lu)));
362         rc = local_object_create(env, los, dto, attr, dof, th);
363         if (rc)
364                 GOTO(unlock, rc);
365         LASSERT(dt_object_exists(dto));
366
367         if (dti->dti_dof.dof_type == DFT_DIR) {
368
369                 rec->rec_type = S_IFDIR;
370                 rec->rec_fid = fid;
371                 /* Add "." and ".." for newly created dir */
372                 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
373                                (const struct dt_key *)".", th);
374                 if (rc != 0)
375                         GOTO(destroy, rc);
376
377                 dt_ref_add(env, dto, th);
378                 rec->rec_fid = lu_object_fid(&parent->do_lu);
379                 rc = dt_insert(env, dto, (const struct dt_rec *)rec,
380                                (const struct dt_key *)"..", th);
381                 if (rc != 0)
382                         GOTO(destroy, rc);
383         }
384
385         rec->rec_fid = fid;
386         rec->rec_type = dto->do_lu.lo_header->loh_attr;
387         dt_write_lock(env, parent, DT_SRC_PARENT);
388         rc = dt_insert(env, parent, (const struct dt_rec *)rec,
389                        (const struct dt_key *)name, th);
390         if (dti->dti_dof.dof_type == DFT_DIR)
391                 dt_ref_add(env, parent, th);
392         dt_write_unlock(env, parent);
393         if (rc)
394                 GOTO(destroy, rc);
395 destroy:
396         if (rc)
397                 dt_destroy(env, dto, th);
398 unlock:
399         dt_write_unlock(env, dto);
400 trans_stop:
401         dt_trans_stop(env, ls->ls_osd, th);
402 out:
403         if (rc) {
404                 dt_object_put_nocache(env, dto);
405                 dto = ERR_PTR(rc);
406         }
407         RETURN(dto);
408 }
409
410 struct dt_object *local_file_find(const struct lu_env *env,
411                                   struct local_oid_storage *los,
412                                   struct dt_object *parent,
413                                   const char *name)
414 {
415         struct dt_thread_info   *dti = dt_info(env);
416         struct dt_object        *dto;
417         int                      rc;
418
419         LASSERT(parent);
420
421         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
422         if (!rc)
423                 dto = ls_locate(env, dt2ls_dev(los->los_dev),
424                                 &dti->dti_fid, NULL);
425         else
426                 dto = ERR_PTR(rc);
427
428         return dto;
429 }
430 EXPORT_SYMBOL(local_file_find);
431
432 /*
433  * Look up and create (if it does not exist) a local named file or directory in
434  * parent directory.
435  */
436 struct dt_object *local_file_find_or_create(const struct lu_env *env,
437                                             struct local_oid_storage *los,
438                                             struct dt_object *parent,
439                                             const char *name, __u32 mode)
440 {
441         struct dt_thread_info   *dti = dt_info(env);
442         struct dt_object        *dto;
443         int                      rc;
444
445         dto = local_file_find(env, los, parent, name);
446         if (!IS_ERR(dto) || PTR_ERR(dto) != -ENOENT)
447                 return dto;
448
449         rc = local_object_fid_generate(env, los, &dti->dti_fid);
450         if (rc)
451                 return ERR_PTR(rc);
452
453         /* create the object */
454         dti->dti_attr.la_valid = LA_MODE;
455         dti->dti_attr.la_mode = mode;
456         dti->dti_dof.dof_type = dt_mode_to_dft(mode & S_IFMT);
457         dto = __local_file_create(env, &dti->dti_fid, los,
458                                   dt2ls_dev(los->los_dev), parent, name,
459                                   &dti->dti_attr, &dti->dti_dof);
460         return dto;
461 }
462 EXPORT_SYMBOL(local_file_find_or_create);
463
464 struct dt_object *local_file_find_or_create_with_fid(const struct lu_env *env,
465                                                      struct dt_device *dt,
466                                                      const struct lu_fid *fid,
467                                                      struct dt_object *parent,
468                                                      const char *name,
469                                                      __u32 mode)
470 {
471         struct dt_thread_info   *dti = dt_info(env);
472         struct dt_object        *dto;
473         int                      rc;
474
475         LASSERT(parent);
476
477         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
478         if (rc == 0) {
479                 dto = dt_locate(env, dt, &dti->dti_fid);
480         } else if (rc != -ENOENT) {
481                 dto = ERR_PTR(rc);
482         } else {
483                 struct ls_device *ls;
484
485                 ls = ls_device_find_or_init(dt);
486                 if (IS_ERR(ls)) {
487                         dto = ERR_CAST(ls);
488                 } else {
489                         /* create the object */
490                         dti->dti_attr.la_valid  = LA_MODE;
491                         dti->dti_attr.la_mode   = mode;
492                         dti->dti_dof.dof_type   = dt_mode_to_dft(mode & S_IFMT);
493                         dto = __local_file_create(env, fid, NULL, ls, parent,
494                                                   name, &dti->dti_attr,
495                                                   &dti->dti_dof);
496                         /* ls_device_put() will finalize the ls device, we
497                          * have to open the object in other device stack */
498                         if (!IS_ERR(dto)) {
499                                 dti->dti_fid = dto->do_lu.lo_header->loh_fid;
500                                 dt_object_put_nocache(env, dto);
501                                 dto = dt_locate(env, dt, &dti->dti_fid);
502                         }
503                         ls_device_put(env, ls);
504                 }
505         }
506         return dto;
507 }
508 EXPORT_SYMBOL(local_file_find_or_create_with_fid);
509
510 /*
511  * Look up and create (if it does not exist) a local named index file in parent
512  * directory.
513  */
514 struct dt_object *local_index_find_or_create(const struct lu_env *env,
515                                              struct local_oid_storage *los,
516                                              struct dt_object *parent,
517                                              const char *name, __u32 mode,
518                                              const struct dt_index_features *ft)
519 {
520         struct dt_thread_info   *dti = dt_info(env);
521         struct dt_object        *dto;
522         int                      rc;
523
524         LASSERT(parent);
525
526         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
527         if (rc == 0) {
528                 /* name is found, get the object */
529                 dto = ls_locate(env, dt2ls_dev(los->los_dev),
530                                 &dti->dti_fid, NULL);
531         } else if (rc != -ENOENT) {
532                 dto = ERR_PTR(rc);
533         } else {
534                 rc = local_object_fid_generate(env, los, &dti->dti_fid);
535                 if (rc < 0) {
536                         dto = ERR_PTR(rc);
537                 } else {
538                         /* create the object */
539                         dti->dti_attr.la_valid          = LA_MODE;
540                         dti->dti_attr.la_mode           = mode;
541                         dti->dti_dof.dof_type           = DFT_INDEX;
542                         dti->dti_dof.u.dof_idx.di_feat  = ft;
543                         dto = __local_file_create(env, &dti->dti_fid, los,
544                                                   dt2ls_dev(los->los_dev),
545                                                   parent, name, &dti->dti_attr,
546                                                   &dti->dti_dof);
547                 }
548         }
549         return dto;
550
551 }
552 EXPORT_SYMBOL(local_index_find_or_create);
553
554 struct dt_object *
555 local_index_find_or_create_with_fid(const struct lu_env *env,
556                                     struct dt_device *dt,
557                                     const struct lu_fid *fid,
558                                     struct dt_object *parent,
559                                     const char *name, __u32 mode,
560                                     const struct dt_index_features *ft)
561 {
562         struct dt_thread_info   *dti = dt_info(env);
563         struct dt_object        *dto;
564         int                      rc;
565
566         LASSERT(parent);
567
568         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
569         if (rc == 0) {
570                 /* name is found, get the object */
571                 if (!lu_fid_eq(fid, &dti->dti_fid))
572                         dto = ERR_PTR(-EINVAL);
573                 else
574                         dto = dt_locate(env, dt, fid);
575         } else if (rc != -ENOENT) {
576                 dto = ERR_PTR(rc);
577         } else {
578                 struct ls_device *ls;
579
580                 ls = ls_device_find_or_init(dt);
581                 if (IS_ERR(ls)) {
582                         dto = ERR_CAST(ls);
583                 } else {
584                         /* create the object */
585                         dti->dti_attr.la_valid          = LA_MODE;
586                         dti->dti_attr.la_mode           = mode;
587                         dti->dti_dof.dof_type           = DFT_INDEX;
588                         dti->dti_dof.u.dof_idx.di_feat  = ft;
589                         dto = __local_file_create(env, fid, NULL, ls, parent,
590                                                   name, &dti->dti_attr,
591                                                   &dti->dti_dof);
592                         /* ls_device_put() will finalize the ls device, we
593                          * have to open the object in other device stack */
594                         if (!IS_ERR(dto)) {
595                                 dti->dti_fid = dto->do_lu.lo_header->loh_fid;
596                                 dt_object_put_nocache(env, dto);
597                                 dto = dt_locate(env, dt, &dti->dti_fid);
598                         }
599                         ls_device_put(env, ls);
600                 }
601         }
602         return dto;
603 }
604 EXPORT_SYMBOL(local_index_find_or_create_with_fid);
605
606 static int local_object_declare_unlink(const struct lu_env *env,
607                                        struct dt_device *dt,
608                                        struct dt_object *p,
609                                        struct dt_object *c, const char *name,
610                                        struct thandle *th)
611 {
612         int rc;
613
614         rc = dt_declare_delete(env, p, (const struct dt_key *)name, th);
615         if (rc < 0)
616                 return rc;
617
618         if (S_ISDIR(p->do_lu.lo_header->loh_attr)) {
619                 rc = dt_declare_ref_del(env, p, th);
620                 if (rc < 0)
621                         return rc;
622         }
623
624         rc = dt_declare_ref_del(env, c, th);
625         if (rc < 0)
626                 return rc;
627
628         return dt_declare_destroy(env, c, th);
629 }
630
631 int local_object_unlink(const struct lu_env *env, struct dt_device *dt,
632                         struct dt_object *parent, const char *name)
633 {
634         struct dt_thread_info   *dti = dt_info(env);
635         struct dt_object        *dto;
636         struct thandle          *th;
637         int                      rc;
638
639         ENTRY;
640
641         rc = dt_lookup_dir(env, parent, name, &dti->dti_fid);
642         if (rc == -ENOENT)
643                 RETURN(0);
644         else if (rc < 0)
645                 RETURN(rc);
646
647         dto = dt_locate(env, dt, &dti->dti_fid);
648         if (unlikely(IS_ERR(dto)))
649                 RETURN(PTR_ERR(dto));
650
651         th = dt_trans_create(env, dt);
652         if (IS_ERR(th))
653                 GOTO(out, rc = PTR_ERR(th));
654
655         rc = local_object_declare_unlink(env, dt, parent, dto, name, th);
656         if (rc < 0)
657                 GOTO(stop, rc);
658
659         rc = dt_trans_start_local(env, dt, th);
660         if (rc < 0)
661                 GOTO(stop, rc);
662
663         if (S_ISDIR(dto->do_lu.lo_header->loh_attr)) {
664                 dt_write_lock(env, parent, 0);
665                 rc = dt_ref_del(env, parent, th);
666                 dt_write_unlock(env, parent);
667                 if (rc)
668                         GOTO(stop, rc);
669         }
670
671         dt_write_lock(env, dto, 0);
672         rc = dt_delete(env, parent, (struct dt_key *)name, th);
673         if (rc < 0)
674                 GOTO(unlock, rc);
675
676         rc = dt_ref_del(env, dto, th);
677         if (rc < 0) {
678                 struct dt_insert_rec *rec = &dti->dti_dt_rec;
679
680                 rec->rec_fid = &dti->dti_fid;
681                 rec->rec_type = dto->do_lu.lo_header->loh_attr;
682                 rc = dt_insert(env, parent, (const struct dt_rec *)rec,
683                                (const struct dt_key *)name, th);
684                 GOTO(unlock, rc);
685         }
686
687         rc = dt_destroy(env, dto, th);
688 unlock:
689         dt_write_unlock(env, dto);
690 stop:
691         dt_trans_stop(env, dt, th);
692 out:
693         dt_object_put_nocache(env, dto);
694         return rc;
695 }
696 EXPORT_SYMBOL(local_object_unlink);
697
698 struct local_oid_storage *dt_los_find(struct ls_device *ls, __u64 seq)
699 {
700         struct local_oid_storage *los, *ret = NULL;
701
702         list_for_each_entry(los, &ls->ls_los_list, los_list) {
703                 if (los->los_seq == seq) {
704                         atomic_inc(&los->los_refcount);
705                         ret = los;
706                         break;
707                 }
708         }
709         return ret;
710 }
711
712 void dt_los_put(struct local_oid_storage *los)
713 {
714         /* should never happen, only local_oid_storage_fini should
715          * drop refcount to zero
716          */
717         LASSERT(!atomic_dec_and_test(&los->los_refcount));
718 }
719
720 /* after Lustre 2.3 release there may be old file to store last generated FID
721  * If such file exists then we have to read its content
722  */
723 static int lastid_compat_check(const struct lu_env *env, struct dt_device *dev,
724                                __u64 lastid_seq, __u32 *first_oid,
725                                struct ls_device *ls)
726 {
727         struct dt_thread_info   *dti = dt_info(env);
728         struct dt_object        *root = NULL;
729         struct los_ondisk        losd;
730         struct dt_object        *o = NULL;
731         int                      rc = 0;
732
733         rc = dt_root_get(env, dev, &dti->dti_fid);
734         if (rc)
735                 return rc;
736
737         root = ls_locate(env, ls, &dti->dti_fid, NULL);
738         if (IS_ERR(root))
739                 return PTR_ERR(root);
740
741         /* find old last_id file */
742         snprintf(dti->dti_buf, sizeof(dti->dti_buf), "seq-%#llx-lastid",
743                  lastid_seq);
744         rc = dt_lookup_dir(env, root, dti->dti_buf, &dti->dti_fid);
745         dt_object_put_nocache(env, root);
746         if (rc == -ENOENT) {
747                 /* old llog lastid accessed by FID only */
748                 if (lastid_seq != FID_SEQ_LLOG)
749                         return 0;
750                 dti->dti_fid.f_seq = FID_SEQ_LLOG;
751                 dti->dti_fid.f_oid = 1;
752                 dti->dti_fid.f_ver = 0;
753                 o = ls_locate(env, ls, &dti->dti_fid, NULL);
754                 if (IS_ERR(o))
755                         return PTR_ERR(o);
756
757                 if (!dt_object_exists(o)) {
758                         dt_object_put_nocache(env, o);
759                         return 0;
760                 }
761                 CDEBUG(D_INFO, "Found old llog lastid file\n");
762         } else if (rc < 0) {
763                 return rc;
764         } else {
765                 CDEBUG(D_INFO, "Found old lastid file for sequence %#llx\n",
766                        lastid_seq);
767                 o = ls_locate(env, ls, &dti->dti_fid, NULL);
768                 if (IS_ERR(o))
769                         return PTR_ERR(o);
770         }
771         /* let's read seq-NNNNNN-lastid file value */
772         LASSERT(dt_object_exists(o));
773         dti->dti_off = 0;
774         dti->dti_lb.lb_buf = &losd;
775         dti->dti_lb.lb_len = sizeof(losd);
776         dt_read_lock(env, o, 0);
777         rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
778         dt_read_unlock(env, o);
779         if (rc == 0 && le32_to_cpu(losd.lso_magic) != LOS_MAGIC) {
780                 CERROR("%s: wrong content of seq-%#llx-lastid file, magic %x\n",
781                        o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq,
782                        le32_to_cpu(losd.lso_magic));
783                 rc = -EINVAL;
784         } else if (rc < 0) {
785                 CERROR("%s: failed to read seq-%#llx-lastid: rc = %d\n",
786                        o->do_lu.lo_dev->ld_obd->obd_name, lastid_seq, rc);
787         }
788         dt_object_put_nocache(env, o);
789         if (rc == 0)
790                 *first_oid = le32_to_cpu(losd.lso_next_oid);
791         return rc;
792 }
793
794
795 /**
796  * Initialize local OID storage for required sequence.
797  * That may be needed for services that uses local files and requires
798  * dynamic OID allocation for them.
799  *
800  * Per each sequence we have an object with 'first_fid' identificator
801  * containing the counter for OIDs of locally created files with that
802  * sequence.
803  *
804  * It is used now by llog subsystem and MGS for NID tables
805  *
806  * Function gets first_fid to create counter object.
807  * All dynamic fids will be generated with the same sequence and incremented
808  * OIDs
809  *
810  * Returned local_oid_storage is in-memory representaion of OID storage
811  */
812 int local_oid_storage_init(const struct lu_env *env, struct dt_device *dev,
813                            const struct lu_fid *first_fid,
814                            struct local_oid_storage **los)
815 {
816         struct dt_thread_info   *dti = dt_info(env);
817         struct ls_device        *ls;
818         u64                      lastid;
819         struct dt_object        *o = NULL;
820         struct thandle          *th;
821         __u32                    first_oid = fid_oid(first_fid);
822         int                      rc = 0;
823
824         ENTRY;
825
826         ls = ls_device_find_or_init(dev);
827         if (IS_ERR(ls))
828                 RETURN(PTR_ERR(ls));
829
830         mutex_lock(&ls->ls_los_mutex);
831         *los = dt_los_find(ls, fid_seq(first_fid));
832         if (*los != NULL)
833                 GOTO(out, rc = 0);
834
835         /* not found, then create */
836         OBD_ALLOC_PTR(*los);
837         if (*los == NULL)
838                 GOTO(out, rc = -ENOMEM);
839
840         atomic_set(&(*los)->los_refcount, 1);
841         mutex_init(&(*los)->los_id_lock);
842         (*los)->los_dev = &ls->ls_top_dev;
843         kref_get(&ls->ls_refcount);
844         list_add(&(*los)->los_list, &ls->ls_los_list);
845
846         /* Use {seq, 0, 0} to create the LAST_ID file for every
847          * sequence.  OIDs start at LUSTRE_FID_INIT_OID.
848          */
849         dti->dti_fid.f_seq = fid_seq(first_fid);
850         dti->dti_fid.f_oid = LUSTRE_FID_LASTID_OID;
851         dti->dti_fid.f_ver = 0;
852         o = ls_locate(env, ls, &dti->dti_fid, NULL);
853         if (IS_ERR(o))
854                 GOTO(out_los, rc = PTR_ERR(o));
855
856         if (!dt_object_exists(o)) {
857                 rc = lastid_compat_check(env, dev, fid_seq(first_fid),
858                                          &first_oid, ls);
859                 if (rc < 0)
860                         GOTO(out_los, rc);
861
862                 th = dt_trans_create(env, dev);
863                 if (IS_ERR(th))
864                         GOTO(out_los, rc = PTR_ERR(th));
865
866                 dti->dti_attr.la_valid = LA_MODE | LA_TYPE;
867                 dti->dti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
868                 dti->dti_dof.dof_type = dt_mode_to_dft(S_IFREG);
869
870                 rc = dt_declare_create(env, o, &dti->dti_attr, NULL,
871                                        &dti->dti_dof, th);
872                 if (rc)
873                         GOTO(out_trans, rc);
874
875                 lastid = cpu_to_le64(first_oid);
876
877                 dti->dti_off = 0;
878                 dti->dti_lb.lb_buf = &lastid;
879                 dti->dti_lb.lb_len = sizeof(lastid);
880                 rc = dt_declare_record_write(env, o, &dti->dti_lb, dti->dti_off,
881                                              th);
882                 if (rc)
883                         GOTO(out_trans, rc);
884
885                 rc = dt_trans_start_local(env, dev, th);
886                 if (rc)
887                         GOTO(out_trans, rc);
888
889                 dt_write_lock(env, o, 0);
890                 if (dt_object_exists(o))
891                         GOTO(out_lock, rc = 0);
892
893                 rc = dt_create(env, o, &dti->dti_attr, NULL, &dti->dti_dof,
894                                th);
895                 if (rc)
896                         GOTO(out_lock, rc);
897
898                 rc = dt_record_write(env, o, &dti->dti_lb, &dti->dti_off, th);
899                 if (rc)
900                         GOTO(out_lock, rc);
901 out_lock:
902                 dt_write_unlock(env, o);
903 out_trans:
904                 dt_trans_stop(env, dev, th);
905         } else {
906                 dti->dti_off = 0;
907                 dti->dti_lb.lb_buf = &lastid;
908                 dti->dti_lb.lb_len = sizeof(lastid);
909                 dt_read_lock(env, o, 0);
910                 rc = dt_record_read(env, o, &dti->dti_lb, &dti->dti_off);
911                 dt_read_unlock(env, o);
912                 if (rc == 0 && le64_to_cpu(lastid) > OBIF_MAX_OID) {
913                         CERROR("%s: bad oid %llu is read from LAST_ID\n",
914                                o->do_lu.lo_dev->ld_obd->obd_name,
915                                le64_to_cpu(lastid));
916                         rc = -EINVAL;
917                 }
918         }
919 out_los:
920         if (rc != 0) {
921                 list_del(&(*los)->los_list);
922                 ls_device_put(env, ls);
923                 OBD_FREE_PTR(*los);
924                 *los = NULL;
925                 if (o != NULL && !IS_ERR(o))
926                         dt_object_put_nocache(env, o);
927         } else {
928                 (*los)->los_seq = fid_seq(first_fid);
929                 (*los)->los_last_oid = le64_to_cpu(lastid);
930                 (*los)->los_obj = o;
931                 /* Read value should not be less than initial one
932                  * but possible after upgrade from older fs.
933                  * In this case just switch to the first_oid in memory and
934                  * it will be updated on disk with first object generated */
935                 if ((*los)->los_last_oid < first_oid)
936                         (*los)->los_last_oid = first_oid;
937         }
938 out:
939         mutex_unlock(&ls->ls_los_mutex);
940         ls_device_put(env, ls);
941         return rc;
942 }
943 EXPORT_SYMBOL(local_oid_storage_init);
944
945 void local_oid_storage_fini(const struct lu_env *env,
946                             struct local_oid_storage *los)
947 {
948         struct ls_device *ls;
949
950         LASSERT(env);
951         LASSERT(los->los_dev);
952         ls = dt2ls_dev(los->los_dev);
953
954         /* Take the mutex before decreasing the reference to avoid race
955          * conditions as described in LU-4721. */
956         mutex_lock(&ls->ls_los_mutex);
957         if (!atomic_dec_and_test(&los->los_refcount)) {
958                 mutex_unlock(&ls->ls_los_mutex);
959                 return;
960         }
961
962         if (los->los_obj)
963                 dt_object_put_nocache(env, los->los_obj);
964         list_del(&los->los_list);
965         OBD_FREE_PTR(los);
966         mutex_unlock(&ls->ls_los_mutex);
967         ls_device_put(env, ls);
968 }
969 EXPORT_SYMBOL(local_oid_storage_fini);