Whamcloud - gitweb
LU-5478 osd: get rid of obd_* typedefs
[fs/lustre-release.git] / lustre / osd-zfs / osd_oi.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * Copyright (c) 2012, 2013, Intel Corporation.
32  * Use is subject to license terms.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/osd-zfs/osd_oi.c
39  * OI functions to map fid to dnode
40  *
41  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
42  * Author: Mike Pershin <tappro@whamcloud.com>
43  * Author: Di Wang <di.wang@intel.com>
44  */
45
46 #define DEBUG_SUBSYSTEM S_OSD
47
48 #include <lustre_ver.h>
49 #include <libcfs/libcfs.h>
50 #include <obd_support.h>
51 #include <lustre_net.h>
52 #include <obd.h>
53 #include <obd_class.h>
54 #include <lustre_disk.h>
55 #include <lustre_fid.h>
56
57 #include "osd_internal.h"
58
59 #include <sys/dnode.h>
60 #include <sys/dbuf.h>
61 #include <sys/spa.h>
62 #include <sys/stat.h>
63 #include <sys/zap.h>
64 #include <sys/spa_impl.h>
65 #include <sys/zfs_znode.h>
66 #include <sys/dmu_tx.h>
67 #include <sys/dmu_objset.h>
68 #include <sys/dsl_prop.h>
69 #include <sys/sa_impl.h>
70 #include <sys/txg.h>
71
72 #define OSD_OI_FID_NR         (1UL << 7)
73 #define OSD_OI_FID_NR_MAX     (1UL << OSD_OI_FID_OID_BITS_MAX)
74 unsigned int osd_oi_count = OSD_OI_FID_NR;
75
76
77 /*
78  * zfs osd maintains names for known fids in the name hierarchy
79  * so that one can mount filesystem with regular ZFS stack and
80  * access files
81  */
82 struct named_oid {
83         unsigned long    oid;
84         char            *name;
85 };
86
87 static const struct named_oid oids[] = {
88         { LAST_RECV_OID,                LAST_RCVD },
89         { OFD_LAST_GROUP_OID,           "LAST_GROUP" },
90         { LLOG_CATALOGS_OID,            "CATALOGS" },
91         { MGS_CONFIGS_OID,              NULL /*MOUNT_CONFIGS_DIR*/ },
92         { FID_SEQ_SRV_OID,              "seq_srv" },
93         { FID_SEQ_CTL_OID,              "seq_ctl" },
94         { FLD_INDEX_OID,                "fld" },
95         { MDD_LOV_OBJ_OID,              LOV_OBJID },
96         { OFD_HEALTH_CHECK_OID,         HEALTH_CHECK },
97         { ACCT_USER_OID,                "acct_usr_inode" },
98         { ACCT_GROUP_OID,               "acct_grp_inode" },
99         { 0,                            NULL }
100 };
101
102 static char *oid2name(const unsigned long oid)
103 {
104         int i = 0;
105
106         while (oids[i].oid) {
107                 if (oids[i].oid == oid)
108                         return oids[i].name;
109                 i++;
110         }
111         return NULL;
112 }
113
114 /**
115  * Lookup an existing OI by the given name.
116  */
117 static int
118 osd_oi_lookup(const struct lu_env *env, struct osd_device *o,
119               uint64_t parent, const char *name, struct osd_oi *oi)
120 {
121         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
122         int                      rc;
123
124         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
125         if (rc)
126                 return rc;
127
128         rc = strlcpy(oi->oi_name, name, sizeof(oi->oi_name));
129         if (rc >= sizeof(oi->oi_name))
130                 return -E2BIG;
131
132         rc = 0;
133         oi->oi_zapid = zde->zde_dnode;
134
135         return rc;
136 }
137
138 /**
139  * Create a new OI with the given name.
140  */
141 static int
142 osd_oi_create(const struct lu_env *env, struct osd_device *o,
143               uint64_t parent, const char *name, uint64_t *child)
144 {
145         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
146         struct lu_attr          *la = &osd_oti_get(env)->oti_la;
147         dmu_buf_t               *db;
148         dmu_tx_t                *tx;
149         int                      rc;
150
151         /* verify it doesn't already exist */
152         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
153         if (rc == 0)
154                 return -EEXIST;
155
156         /* create fid-to-dnode index */
157         tx = dmu_tx_create(o->od_os);
158         if (tx == NULL)
159                 return -ENOMEM;
160
161         dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL);
162         dmu_tx_hold_bonus(tx, parent);
163         dmu_tx_hold_zap(tx, parent, TRUE, name);
164         LASSERT(tx->tx_objset->os_sa);
165         dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
166
167         rc = -dmu_tx_assign(tx, TXG_WAIT);
168         if (rc) {
169                 dmu_tx_abort(tx);
170                 return rc;
171         }
172
173         la->la_valid = LA_MODE | LA_UID | LA_GID;
174         la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
175         la->la_uid = la->la_gid = 0;
176         __osd_zap_create(env, o, &db, tx, la, parent, 0);
177
178         zde->zde_dnode = db->db_object;
179         zde->zde_pad = 0;
180         zde->zde_type = IFTODT(S_IFDIR);
181
182         rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx);
183
184         dmu_tx_commit(tx);
185
186         *child = db->db_object;
187         sa_buf_rele(db, osd_obj_tag);
188
189         return rc;
190 }
191
192 static int
193 osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o,
194                       uint64_t parent, const char *name, uint64_t *child)
195 {
196         struct osd_oi   oi;
197         int             rc;
198
199         rc = osd_oi_lookup(env, o, parent, name, &oi);
200         if (rc == 0)
201                 *child = oi.oi_zapid;
202         else if (rc == -ENOENT)
203                 rc = osd_oi_create(env, o, parent, name, child);
204
205         return rc;
206 }
207
208 /**
209  * Lookup the target index/flags of the fid, so it will know where
210  * the object is located (tgt index) and it is MDT or OST object.
211  */
212 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
213                    u64 seq, struct lu_seq_range *range)
214 {
215         struct seq_server_site  *ss = osd_seq_site(osd);
216
217         if (fid_seq_is_idif(seq)) {
218                 fld_range_set_ost(range);
219                 range->lsr_index = idif_ost_idx(seq);
220                 return 0;
221         }
222
223         if (!fid_seq_in_fldb(seq)) {
224                 fld_range_set_mdt(range);
225                 if (ss != NULL)
226                         /* FIXME: If ss is NULL, it suppose not get lsr_index
227                          * at all */
228                         range->lsr_index = ss->ss_node_id;
229                 return 0;
230         }
231
232         LASSERT(ss != NULL);
233         fld_range_set_any(range);
234         /* OSD will only do local fld lookup */
235         return fld_local_lookup(env, ss->ss_server_fld, seq, range);
236 }
237
238 int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
239                   const struct lu_fid *fid)
240 {
241         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
242         int                     rc;
243         ENTRY;
244
245         if (fid_is_idif(fid))
246                 RETURN(1);
247
248         if (unlikely(fid_is_local_file(fid) || fid_is_llog(fid)) ||
249                      fid_is_name_llog(fid) || fid_is_quota(fid))
250                 RETURN(0);
251
252         rc = osd_fld_lookup(env, osd, fid_seq(fid), range);
253         if (rc != 0) {
254                 if (rc != -ENOENT)
255                         CERROR("%s: "DFID" lookup failed: rc = %d\n",
256                                osd_name(osd), PFID(fid), rc);
257                 RETURN(0);
258         }
259
260         if (fld_range_is_ost(range))
261                 RETURN(1);
262
263         RETURN(0);
264 }
265
266 static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list,
267                                            u64 seq)
268 {
269         struct osd_seq *osd_seq;
270
271         list_for_each_entry(osd_seq, &seq_list->osl_seq_list, os_seq_list) {
272                 if (osd_seq->os_seq == seq)
273                         return osd_seq;
274         }
275         return NULL;
276 }
277
278 static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, u64 seq)
279 {
280         struct osd_seq *osd_seq;
281
282         read_lock(&seq_list->osl_seq_list_lock);
283         osd_seq = osd_seq_find_locked(seq_list, seq);
284         read_unlock(&seq_list->osl_seq_list_lock);
285
286         return osd_seq;
287 }
288
289 static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env,
290                                            struct osd_device *osd, u64 seq)
291 {
292         struct osd_seq_list     *seq_list = &osd->od_seq_list;
293         struct osd_seq          *osd_seq;
294         char                    *key = osd_oti_get(env)->oti_buf;
295         char                    *seq_name = osd_oti_get(env)->oti_str;
296         struct osd_oi           oi;
297         uint64_t                sdb, odb;
298         int                     i;
299         int                     rc = 0;
300         ENTRY;
301
302         osd_seq = osd_seq_find(seq_list, seq);
303         if (osd_seq != NULL)
304                 RETURN(osd_seq);
305
306         down(&seq_list->osl_seq_init_sem);
307         /* Check again, in case some one else already add it
308          * to the list */
309         osd_seq = osd_seq_find(seq_list, seq);
310         if (osd_seq != NULL)
311                 GOTO(out, rc = 0);
312
313         OBD_ALLOC_PTR(osd_seq);
314         if (osd_seq == NULL)
315                 GOTO(out, rc = -ENOMEM);
316
317         INIT_LIST_HEAD(&osd_seq->os_seq_list);
318         osd_seq->os_seq = seq;
319
320         /* Init subdir count to be 32, but each seq can have
321          * different subdir count */
322         osd_seq->os_subdir_count = OSD_OST_MAP_SIZE;
323         OBD_ALLOC(osd_seq->os_compat_dirs,
324                   sizeof(uint64_t) * osd_seq->os_subdir_count);
325         if (osd_seq->os_compat_dirs == NULL)
326                 GOTO(out, rc = -ENOMEM);
327
328         oi.oi_zapid = osd->od_O_id;
329         sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
330                 fid_seq_is_mdt0(seq)) ?  LPU64 : LPX64i,
331                 fid_seq_is_idif(seq) ? 0 : seq);
332
333         rc = osd_oi_find_or_create(env, osd, oi.oi_zapid, seq_name, &odb);
334         if (rc != 0) {
335                 CERROR("%s: Can not create %s : rc = %d\n",
336                        osd_name(osd), seq_name, rc);
337                 GOTO(out, rc);
338         }
339
340         for (i = 0; i < OSD_OST_MAP_SIZE; i++) {
341                 sprintf(key, "d%d", i);
342                 rc = osd_oi_find_or_create(env, osd, odb, key, &sdb);
343                 if (rc)
344                         GOTO(out, rc);
345                 osd_seq->os_compat_dirs[i] = sdb;
346         }
347
348         write_lock(&seq_list->osl_seq_list_lock);
349         list_add(&osd_seq->os_seq_list, &seq_list->osl_seq_list);
350         write_unlock(&seq_list->osl_seq_list_lock);
351 out:
352         up(&seq_list->osl_seq_init_sem);
353         if (rc != 0) {
354                 if (osd_seq != NULL && osd_seq->os_compat_dirs != NULL)
355                         OBD_FREE(osd_seq->os_compat_dirs,
356                                  sizeof(uint64_t) * osd_seq->os_subdir_count);
357                 if (osd_seq != NULL)
358                         OBD_FREE_PTR(osd_seq);
359                 osd_seq = ERR_PTR(rc);
360         }
361         RETURN(osd_seq);
362 }
363
364 /*
365  * objects w/o a natural reference (unlike a file on a MDS)
366  * are put under a special hierarchy /O/<seq>/d0..dXX
367  * this function returns a directory specific fid belongs to
368  */
369 static uint64_t
370 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
371                         const struct lu_fid *fid, char *buf)
372 {
373         struct osd_seq  *osd_seq;
374         unsigned long   b;
375         u64             id;
376         int             rc;
377
378         osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid));
379         if (IS_ERR(osd_seq)) {
380                 CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd),
381                        PFID(fid));
382                 return PTR_ERR(osd_seq);
383         }
384
385         if (fid_is_last_id(fid)) {
386                 id = 0;
387         } else {
388                 rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid);
389                 LASSERT(rc == 0); /* we should not get here with IGIF */
390                 id = ostid_id(&osd_oti_get(env)->oti_ostid);
391         }
392
393         b = id % OSD_OST_MAP_SIZE;
394         LASSERT(osd_seq->os_compat_dirs[b]);
395
396         sprintf(buf, LPU64, id);
397
398         return osd_seq->os_compat_dirs[b];
399 }
400
401 /* XXX: f_ver is not counted, but may differ too */
402 static void osd_fid2str(char *buf, const struct lu_fid *fid)
403 {
404         sprintf(buf, DFID_NOBRACE, PFID(fid));
405 }
406
407 /*
408  * Determine the zap object id which is being used as the OI for the
409  * given fid.  The lowest N bits in the sequence ID are used as the
410  * index key.  On failure 0 is returned which zfs treats internally
411  * as an invalid object id.
412  */
413 static uint64_t
414 osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
415                     char *buf)
416 {
417         struct osd_oi *oi;
418
419         LASSERT(osd->od_oi_table != NULL);
420         oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
421         osd_fid2str(buf, fid);
422
423         return oi->oi_zapid;
424 }
425
426 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
427                             const struct lu_fid *fid, char *buf)
428 {
429         uint64_t zapid;
430
431         LASSERT(fid);
432         LASSERT(buf);
433
434         if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) {
435                 zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf);
436         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
437                 /* special objects with fixed known fids get their name */
438                 char *name = oid2name(fid_oid(fid));
439
440                 if (name) {
441                         zapid = osd->od_root;
442                         strcpy(buf, name);
443                         if (fid_is_acct(fid))
444                                 zapid = MASTER_NODE_OBJ;
445                 } else {
446                         zapid = osd_get_idx_for_fid(osd, fid, buf);
447                 }
448         } else {
449                 zapid = osd_get_idx_for_fid(osd, fid, buf);
450         }
451
452         return zapid;
453 }
454
455 static inline int fid_is_fs_root(const struct lu_fid *fid)
456 {
457         /* Map root inode to special local object FID */
458         return fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
459                 fid_oid(fid) == OSD_FS_ROOT_OID;
460 }
461
462 int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
463                    const struct lu_fid *fid, uint64_t *oid)
464 {
465         struct osd_thread_info  *info = osd_oti_get(env);
466         char                    *buf = info->oti_buf;
467         uint64_t                zapid;
468         int                     rc = 0;
469         ENTRY;
470
471         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
472                 RETURN(-ENOENT);
473
474         if (unlikely(fid_is_acct(fid))) {
475                 if (fid_oid(fid) == ACCT_USER_OID)
476                         *oid = dev->od_iusr_oid;
477                 else
478                         *oid = dev->od_igrp_oid;
479         } else if (unlikely(fid_is_fs_root(fid))) {
480                 *oid = dev->od_root;
481         } else {
482                 zapid = osd_get_name_n_idx(env, dev, fid, buf);
483
484                 rc = -zap_lookup(dev->od_os, zapid, buf,
485                                 8, 1, &info->oti_zde);
486                 if (rc)
487                         RETURN(rc);
488                 *oid = info->oti_zde.lzd_reg.zde_dnode;
489         }
490
491         if (rc == 0)
492                 dmu_prefetch(dev->od_os, *oid, 0, 0);
493
494         RETURN(rc);
495 }
496
497 /**
498  * Close an entry in a specific slot.
499  */
500 static void
501 osd_oi_remove_table(const struct lu_env *env, struct osd_device *o, int key)
502 {
503         struct osd_oi *oi;
504
505         LASSERT(key < o->od_oi_count);
506
507         oi = o->od_oi_table[key];
508         if (oi) {
509                 OBD_FREE_PTR(oi);
510                 o->od_oi_table[key] = NULL;
511         }
512 }
513
514 /**
515  * Allocate and open a new entry in the specified unused slot.
516  */
517 static int
518 osd_oi_add_table(const struct lu_env *env, struct osd_device *o,
519                  char *name, int key)
520 {
521         struct osd_oi *oi;
522         int rc;
523
524         LASSERT(key < o->od_oi_count);
525         LASSERT(o->od_oi_table[key] == NULL);
526
527         OBD_ALLOC_PTR(oi);
528         if (oi == NULL)
529                 return -ENOMEM;
530
531         rc = osd_oi_lookup(env, o, o->od_root, name, oi);
532         if (rc) {
533                 OBD_FREE_PTR(oi);
534                 return rc;
535         }
536
537         o->od_oi_table[key] = oi;
538
539         return 0;
540 }
541
542 /**
543  * Depopulate the OI table.
544  */
545 static void
546 osd_oi_close_table(const struct lu_env *env, struct osd_device *o)
547 {
548         int i;
549
550         for (i = 0; i < o->od_oi_count; i++)
551                 osd_oi_remove_table(env, o, i);
552 }
553
554 /**
555  * Populate the OI table based.
556  */
557 static int
558 osd_oi_open_table(const struct lu_env *env, struct osd_device *o, int count)
559 {
560         char name[16];
561         int  i, rc = 0;
562         ENTRY;
563
564         for (i = 0; i < count; i++) {
565                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
566                 rc = osd_oi_add_table(env, o, name, i);
567                 if (rc) {
568                         osd_oi_close_table(env, o);
569                         break;
570                 }
571         }
572
573         RETURN(rc);
574 }
575
576 /**
577  * Determine if the type and number of OIs used by this file system.
578  */
579 static int
580 osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count)
581 {
582         uint64_t        root_oid = o->od_root;
583         struct osd_oi   oi;
584         char            name[16];
585         int             rc;
586         ENTRY;
587
588         /*
589          * Check for multiple OIs and determine the count.  There is no
590          * gap handling, if an OI is missing the wrong size can be returned.
591          * The only safeguard is that we know the number of OIs must be a
592          * power of two and this is checked for basic sanity.
593          */
594         for (*count = 0; *count < OSD_OI_FID_NR_MAX; (*count)++) {
595                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, *count);
596                 rc = osd_oi_lookup(env, o, root_oid, name, &oi);
597                 if (rc == 0)
598                         continue;
599
600                 if (rc == -ENOENT) {
601                         if (*count == 0)
602                                 break;
603
604                         if ((*count & (*count - 1)) != 0)
605                                 RETURN(-EDOM);
606
607                         RETURN(0);
608                 }
609
610                 RETURN(rc);
611         }
612
613         /*
614          * No OIs exist, this must be a new filesystem.
615          */
616         *count = 0;
617
618         RETURN(0);
619 }
620
621 static void osd_ost_seq_init(const struct lu_env *env, struct osd_device *osd)
622 {
623         struct osd_seq_list *osl = &osd->od_seq_list;
624
625         INIT_LIST_HEAD(&osl->osl_seq_list);
626         rwlock_init(&osl->osl_seq_list_lock);
627         sema_init(&osl->osl_seq_init_sem, 1);
628 }
629
630 static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd)
631 {
632         struct osd_seq_list     *osl = &osd->od_seq_list;
633         struct osd_seq          *osd_seq, *tmp;
634
635         write_lock(&osl->osl_seq_list_lock);
636         list_for_each_entry_safe(osd_seq, tmp, &osl->osl_seq_list,
637                                  os_seq_list) {
638                 list_del(&osd_seq->os_seq_list);
639                 OBD_FREE(osd_seq->os_compat_dirs,
640                          sizeof(uint64_t) * osd_seq->os_subdir_count);
641                 OBD_FREE(osd_seq, sizeof(*osd_seq));
642         }
643         write_unlock(&osl->osl_seq_list_lock);
644
645         return;
646 }
647
648 /**
649  * Create /O subdirectory to map legacy OST objects for compatibility.
650  */
651 static int
652 osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
653 {
654         uint64_t         odb, sdb;
655         int              rc;
656         ENTRY;
657
658         rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb);
659         if (rc)
660                 RETURN(rc);
661
662         o->od_O_id = sdb;
663
664         osd_ost_seq_init(env, o);
665         /* Create on-disk indexes to maintain per-UID/GID inode usage.
666          * Those new indexes are created in the top-level ZAP outside the
667          * namespace in order not to confuse ZPL which might interpret those
668          * indexes as directories and assume the values are object IDs */
669         rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ,
670                         oid2name(ACCT_USER_OID), &odb);
671         if (rc)
672                 RETURN(rc);
673         o->od_iusr_oid = odb;
674
675         rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ,
676                         oid2name(ACCT_GROUP_OID), &odb);
677         if (rc)
678                 RETURN(rc);
679         o->od_igrp_oid = odb;
680
681         RETURN(rc);
682 }
683
684 static char *root2convert = "ROOT";
685 /*
686  * due to DNE requirements we have to change sequence of /ROOT object
687  * so that it doesn't belong to the local sequence FID_SEQ_LOCAL_FILE
688  * but a normal sequence living on MDS#0
689  * this is the sole purpose of this function.
690  *
691  * This is only needed for pre-production 2.4 ZFS filesystems, and
692  * can be removed in the future.
693  */
694 int osd_convert_root_to_new_seq(const struct lu_env *env,
695                                         struct osd_device *o)
696 {
697         struct luz_direntry *lze = &osd_oti_get(env)->oti_zde;
698         char                *buf = osd_oti_get(env)->oti_str;
699         struct lu_fid        newfid;
700         uint64_t             zapid;
701         dmu_tx_t            *tx = NULL;
702         int                  rc;
703         ENTRY;
704
705         /* ignore OSTs */
706         if (strstr(o->od_svname, "MDT") == NULL)
707                 RETURN(0);
708
709         /* lookup /ROOT */
710         rc = -zap_lookup(o->od_os, o->od_root, root2convert, 8,
711                          sizeof(*lze) / 8, (void *)lze);
712         /* doesn't exist or let actual user to handle the error */
713         if (rc)
714                 RETURN(0);
715
716         CDEBUG(D_OTHER, "%s: /ROOT -> "DFID" -> "LPU64"\n", o->od_svname,
717                PFID(&lze->lzd_fid), (long long int) lze->lzd_reg.zde_dnode);
718
719         /* already right one? */
720         if (fid_seq(&lze->lzd_fid) == FID_SEQ_ROOT)
721                 return 0;
722
723         tx = dmu_tx_create(o->od_os);
724         if (tx == NULL)
725                 return -ENOMEM;
726
727         dmu_tx_hold_bonus(tx, o->od_root);
728
729         /* declare delete/insert of the name */
730         dmu_tx_hold_zap(tx, o->od_root, TRUE, root2convert);
731         dmu_tx_hold_zap(tx, o->od_root, FALSE, root2convert);
732
733         /* declare that we'll remove object from fid-dnode mapping */
734         zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf);
735         dmu_tx_hold_bonus(tx, zapid);
736         dmu_tx_hold_zap(tx, zapid, FALSE, buf);
737
738         /* declare that we'll add object to fid-dnode mapping */
739         newfid.f_seq = FID_SEQ_ROOT;
740         newfid.f_oid = 1;
741         newfid.f_ver = 0;
742         zapid = osd_get_name_n_idx(env, o, &newfid, buf);
743         dmu_tx_hold_bonus(tx, zapid);
744         dmu_tx_hold_zap(tx, zapid, TRUE, buf);
745
746         rc = -dmu_tx_assign(tx, TXG_WAIT);
747         if (rc)
748                 GOTO(err, rc);
749
750         rc = -zap_remove(o->od_os, o->od_root, root2convert, tx);
751         if (rc)
752                 GOTO(err, rc);
753
754         /* remove from OI */
755         zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf);
756         rc = -zap_remove(o->od_os, zapid, buf, tx);
757         if (rc)
758                 GOTO(err, rc);
759
760         lze->lzd_fid = newfid;
761         rc = -zap_add(o->od_os, o->od_root, root2convert,
762                       8, sizeof(*lze) / 8, (void *)lze, tx);
763         if (rc)
764                 GOTO(err, rc);
765
766         /* add to OI with the new fid */
767         zapid = osd_get_name_n_idx(env, o, &newfid, buf);
768         rc = -zap_add(o->od_os, zapid, buf, 8, 1, &lze->lzd_reg, tx);
769         if (rc)
770                 GOTO(err, rc);
771
772
773         /* LMA will be updated in mdd_compat_fixes */
774         dmu_tx_commit(tx);
775
776         RETURN(rc);
777
778 err:
779         if (tx)
780                 dmu_tx_abort(tx);
781         CERROR("%s: can't convert to new fid: rc = %d\n", o->od_svname, rc);
782         RETURN(rc);
783 }
784
785 /**
786  * Initialize the OIs by either opening or creating them as needed.
787  */
788 int osd_oi_init(const struct lu_env *env, struct osd_device *o)
789 {
790         char    *key = osd_oti_get(env)->oti_buf;
791         int      i, rc, count = 0;
792         ENTRY;
793
794         rc = osd_oi_probe(env, o, &count);
795         if (rc)
796                 RETURN(rc);
797
798         if (count == 0) {
799                 uint64_t odb, sdb;
800
801                 count = osd_oi_count;
802                 odb = o->od_root;
803
804                 for (i = 0; i < count; i++) {
805                         sprintf(key, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
806                         rc = osd_oi_find_or_create(env, o, odb, key, &sdb);
807                         if (rc)
808                                 RETURN(rc);
809                 }
810         }
811
812         rc = osd_oi_init_compat(env, o);
813         if (rc)
814                 RETURN(rc);
815
816         LASSERT((count & (count - 1)) == 0);
817         o->od_oi_count = count;
818         OBD_ALLOC(o->od_oi_table, sizeof(struct osd_oi *) * count);
819         if (o->od_oi_table == NULL)
820                 RETURN(-ENOMEM);
821
822         rc = osd_oi_open_table(env, o, count);
823         if (rc) {
824                 OBD_FREE(o->od_oi_table, sizeof(struct osd_oi *) * count);
825                 o->od_oi_table = NULL;
826         }
827
828         RETURN(rc);
829 }
830
831 void osd_oi_fini(const struct lu_env *env, struct osd_device *o)
832 {
833         ENTRY;
834
835         osd_ost_seq_fini(env, o);
836
837         if (o->od_oi_table != NULL) {
838                 (void) osd_oi_close_table(env, o);
839                 OBD_FREE(o->od_oi_table,
840                          sizeof(struct osd_oi *) * o->od_oi_count);
841                 o->od_oi_table = NULL;
842                 o->od_oi_count = 0;
843         }
844
845         EXIT;
846 }
847
848 int osd_options_init(void)
849 {
850         /* osd_oi_count - Default number of OIs, 128 works well for ZFS */
851         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
852                 osd_oi_count = OSD_OI_FID_NR;
853
854         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
855                 LCONSOLE_WARN("Round up osd_oi_count %d to power2 %d\n",
856                         osd_oi_count, size_roundup_power2(osd_oi_count));
857                 osd_oi_count = size_roundup_power2(osd_oi_count);
858         }
859
860         return 0;
861 }