Whamcloud - gitweb
LU-10186 osd-zfs: move LAST_ID OI mapping out of oi.xx
[fs/lustre-release.git] / lustre / osd-zfs / osd_oi.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_oi.c
33  * OI functions to map fid to dnode
34  *
35  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
36  * Author: Mike Pershin <tappro@whamcloud.com>
37  * Author: Di Wang <di.wang@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_OSD
41
42 #include <libcfs/libcfs.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <obd.h>
46 #include <obd_class.h>
47 #include <lustre_disk.h>
48 #include <lustre_fid.h>
49
50 #include "osd_internal.h"
51
52 #include <sys/dnode.h>
53 #include <sys/dbuf.h>
54 #include <sys/spa.h>
55 #include <sys/stat.h>
56 #include <sys/zap.h>
57 #include <sys/spa_impl.h>
58 #include <sys/zfs_znode.h>
59 #include <sys/dmu_tx.h>
60 #include <sys/dmu_objset.h>
61 #include <sys/dsl_prop.h>
62 #include <sys/sa_impl.h>
63 #include <sys/txg.h>
64 #include <lustre_scrub.h>
65
66 #define OSD_OI_FID_NR         (1UL << 7)
67 unsigned int osd_oi_count = OSD_OI_FID_NR;
68
69
70 /*
71  * zfs osd maintains names for known fids in the name hierarchy
72  * so that one can mount filesystem with regular ZFS stack and
73  * access files
74  */
75 struct named_oid {
76         unsigned long    oid;
77         char            *name;
78 };
79
80 static const struct named_oid oids[] = {
81         { .oid = LAST_RECV_OID,        .name = LAST_RCVD },
82         { .oid = OFD_LAST_GROUP_OID,   .name = "LAST_GROUP" },
83         { .oid = LLOG_CATALOGS_OID,    .name = "CATALOGS" },
84         { .oid = MGS_CONFIGS_OID,      /*MOUNT_CONFIGS_DIR*/ },
85         { .oid = FID_SEQ_SRV_OID,      .name = "seq_srv" },
86         { .oid = FID_SEQ_CTL_OID,      .name = "seq_ctl" },
87         { .oid = FLD_INDEX_OID,        .name = "fld" },
88         { .oid = MDD_LOV_OBJ_OID,      .name = LOV_OBJID },
89         { .oid = OFD_HEALTH_CHECK_OID, .name = HEALTH_CHECK },
90         { .oid = REPLY_DATA_OID,       .name = REPLY_DATA },
91         { .oid = MDD_LOV_OBJ_OSEQ,     .name = LOV_OBJSEQ },
92         { .oid = BATCHID_COMMITTED_OID, .name = "BATCHID" },
93         { .oid = 0 }
94 };
95
96 static inline bool fid_is_objseq(const struct lu_fid *fid)
97 {
98         return fid->f_seq == FID_SEQ_LOCAL_FILE &&
99                 fid->f_oid == MDD_LOV_OBJ_OSEQ;
100 }
101
102 static inline bool fid_is_batchid(const struct lu_fid *fid)
103 {
104         return fid->f_seq == FID_SEQ_LOCAL_FILE &&
105                 fid->f_oid == BATCHID_COMMITTED_OID;
106 }
107
108 static char *oid2name(const unsigned long oid)
109 {
110         int i = 0;
111
112         while (oids[i].oid) {
113                 if (oids[i].oid == oid)
114                         return oids[i].name;
115                 i++;
116         }
117         return NULL;
118 }
119
120 /**
121  * Lookup an existing OI by the given name.
122  */
123 static int
124 osd_oi_lookup(const struct lu_env *env, struct osd_device *o,
125               uint64_t parent, const char *name, struct osd_oi *oi)
126 {
127         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
128         int                      rc;
129
130         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
131         if (rc)
132                 return rc;
133
134         rc = strlcpy(oi->oi_name, name, sizeof(oi->oi_name));
135         if (rc >= sizeof(oi->oi_name))
136                 return -E2BIG;
137
138         oi->oi_zapid = zde->zde_dnode;
139
140         return 0;
141 }
142
143 /**
144  * Create a new OI with the given name.
145  */
146 static int
147 osd_oi_create(const struct lu_env *env, struct osd_device *o,
148               uint64_t parent, const char *name, uint64_t *child)
149 {
150         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
151         struct lu_attr          *la = &osd_oti_get(env)->oti_la;
152         sa_handle_t             *sa_hdl = NULL;
153         dmu_tx_t                *tx;
154         uint64_t                 oid;
155         int                      rc;
156
157         /* verify it doesn't already exist */
158         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
159         if (rc == 0)
160                 return -EEXIST;
161
162         if (o->od_dt_dev.dd_rdonly)
163                 return -EROFS;
164
165         /* create fid-to-dnode index */
166         tx = dmu_tx_create(o->od_os);
167         if (tx == NULL)
168                 return -ENOMEM;
169
170         dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL);
171         dmu_tx_hold_bonus(tx, parent);
172         dmu_tx_hold_zap(tx, parent, TRUE, name);
173         dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
174
175         rc = -dmu_tx_assign(tx, TXG_WAIT);
176         if (rc) {
177                 dmu_tx_abort(tx);
178                 return rc;
179         }
180
181         oid = osd_zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64,
182                                    DMU_OT_DIRECTORY_CONTENTS,
183                                    14, /* == ZFS fzap_default_block_shift */
184                                    DN_MAX_INDBLKSHIFT,
185                                    0, tx);
186
187         rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
188         if (rc)
189                 goto commit;
190         memset(la, 0, sizeof(*la));
191         la->la_valid = LA_MODE | LA_UID | LA_GID;
192         la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
193         rc = __osd_attr_init(env, o, NULL, sa_hdl, tx, la, parent, NULL);
194         sa_handle_destroy(sa_hdl);
195         if (rc)
196                 goto commit;
197
198         zde->zde_dnode = oid;
199         zde->zde_pad = 0;
200         zde->zde_type = IFTODT(S_IFDIR);
201
202         rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx);
203
204 commit:
205         if (rc)
206                 dmu_object_free(o->od_os, oid, tx);
207         dmu_tx_commit(tx);
208
209         if (rc == 0)
210                 *child = oid;
211
212         return rc;
213 }
214
215 static int
216 osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o,
217                       uint64_t parent, const char *name, uint64_t *child)
218 {
219         struct osd_oi   oi;
220         int             rc;
221
222         rc = osd_oi_lookup(env, o, parent, name, &oi);
223         if (rc == 0)
224                 *child = oi.oi_zapid;
225         else if (rc == -ENOENT)
226                 rc = osd_oi_create(env, o, parent, name, child);
227
228         return rc;
229 }
230
231 /**
232  * Lookup the target index/flags of the fid, so it will know where
233  * the object is located (tgt index) and it is MDT or OST object.
234  */
235 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
236                    u64 seq, struct lu_seq_range *range)
237 {
238         struct seq_server_site  *ss = osd_seq_site(osd);
239
240         if (fid_seq_is_idif(seq)) {
241                 fld_range_set_ost(range);
242                 range->lsr_index = idif_ost_idx(seq);
243                 return 0;
244         }
245
246         if (!fid_seq_in_fldb(seq)) {
247                 fld_range_set_mdt(range);
248                 if (ss != NULL)
249                         /* FIXME: If ss is NULL, it suppose not get lsr_index
250                          * at all */
251                         range->lsr_index = ss->ss_node_id;
252                 return 0;
253         }
254
255         LASSERT(ss != NULL);
256         fld_range_set_any(range);
257         /* OSD will only do local fld lookup */
258         return fld_local_lookup(env, ss->ss_server_fld, seq, range);
259 }
260
261 int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
262                   const struct lu_fid *fid)
263 {
264         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
265         int                     rc;
266         ENTRY;
267
268         if (fid_is_idif(fid))
269                 RETURN(1);
270
271         if (unlikely(fid_is_local_file(fid) || fid_is_llog(fid)) ||
272                      fid_is_name_llog(fid) || fid_is_quota(fid))
273                 RETURN(0);
274
275         rc = osd_fld_lookup(env, osd, fid_seq(fid), range);
276         if (rc != 0) {
277                 /* During upgrade, OST FLDB might not be loaded because
278                  * OST FLDB is not created until 2.6, so if some DNE
279                  * filesystem upgrade from 2.5 to 2.7/2.8, they will
280                  * not be able to find the sequence from local FLDB
281                  * cache see fld_index_init(). */
282                 if (rc == -ENOENT && osd->od_is_ost)
283                         RETURN(1);
284
285                 if (rc != -ENOENT)
286                         CERROR("%s: "DFID" lookup failed: rc = %d\n",
287                                osd_name(osd), PFID(fid), rc);
288                 RETURN(0);
289         }
290
291         if (fld_range_is_ost(range))
292                 RETURN(1);
293
294         RETURN(0);
295 }
296
297 static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list,
298                                            u64 seq)
299 {
300         struct osd_seq *osd_seq;
301
302         list_for_each_entry(osd_seq, &seq_list->osl_seq_list, os_seq_list) {
303                 if (osd_seq->os_seq == seq)
304                         return osd_seq;
305         }
306         return NULL;
307 }
308
309 static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, u64 seq)
310 {
311         struct osd_seq *osd_seq;
312
313         read_lock(&seq_list->osl_seq_list_lock);
314         osd_seq = osd_seq_find_locked(seq_list, seq);
315         read_unlock(&seq_list->osl_seq_list_lock);
316
317         return osd_seq;
318 }
319
320 static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env,
321                                            struct osd_device *osd, u64 seq)
322 {
323         struct osd_seq_list     *seq_list = &osd->od_seq_list;
324         struct osd_seq          *osd_seq;
325         char                    *key = osd_oti_get(env)->oti_buf;
326         char                    *seq_name = osd_oti_get(env)->oti_str;
327         struct osd_oi           oi;
328         uint64_t                sdb, odb;
329         int                     i;
330         int                     rc = 0;
331         ENTRY;
332
333         osd_seq = osd_seq_find(seq_list, seq);
334         if (osd_seq != NULL)
335                 RETURN(osd_seq);
336
337         down(&seq_list->osl_seq_init_sem);
338         /* Check again, in case some one else already add it
339          * to the list */
340         osd_seq = osd_seq_find(seq_list, seq);
341         if (osd_seq != NULL)
342                 GOTO(out, rc = 0);
343
344         OBD_ALLOC_PTR(osd_seq);
345         if (osd_seq == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         INIT_LIST_HEAD(&osd_seq->os_seq_list);
349         osd_seq->os_seq = seq;
350
351         /* Init subdir count to be 32, but each seq can have
352          * different subdir count */
353         osd_seq->os_subdir_count = OSD_OST_MAP_SIZE;
354         OBD_ALLOC(osd_seq->os_compat_dirs,
355                   sizeof(uint64_t) * osd_seq->os_subdir_count);
356         if (osd_seq->os_compat_dirs == NULL)
357                 GOTO(out, rc = -ENOMEM);
358
359         oi.oi_zapid = osd->od_O_id;
360         sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
361                 fid_seq_is_mdt0(seq)) ?  "%llu" : "%llx",
362                 fid_seq_is_idif(seq) ? 0 : seq);
363
364         rc = osd_oi_find_or_create(env, osd, oi.oi_zapid, seq_name, &odb);
365         if (rc != 0) {
366                 CERROR("%s: Can not create %s : rc = %d\n",
367                        osd_name(osd), seq_name, rc);
368                 GOTO(out, rc);
369         }
370
371         osd_seq->os_oid = odb;
372         for (i = 0; i < OSD_OST_MAP_SIZE; i++) {
373                 sprintf(key, "d%d", i);
374                 rc = osd_oi_find_or_create(env, osd, odb, key, &sdb);
375                 if (rc)
376                         GOTO(out, rc);
377                 osd_seq->os_compat_dirs[i] = sdb;
378         }
379
380         write_lock(&seq_list->osl_seq_list_lock);
381         list_add(&osd_seq->os_seq_list, &seq_list->osl_seq_list);
382         write_unlock(&seq_list->osl_seq_list_lock);
383 out:
384         up(&seq_list->osl_seq_init_sem);
385         if (rc != 0) {
386                 if (osd_seq != NULL && osd_seq->os_compat_dirs != NULL)
387                         OBD_FREE(osd_seq->os_compat_dirs,
388                                  sizeof(uint64_t) * osd_seq->os_subdir_count);
389                 if (osd_seq != NULL)
390                         OBD_FREE_PTR(osd_seq);
391                 osd_seq = ERR_PTR(rc);
392         }
393         RETURN(osd_seq);
394 }
395
396 static uint64_t
397 osd_get_idx_for_ost_obj_compat(const struct lu_env *env, struct osd_device *osd,
398                                const struct lu_fid *fid, char *buf, int bufsize)
399 {
400         struct osd_seq  *osd_seq;
401         unsigned long   b;
402         u64             id;
403         int             rc;
404
405         osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid));
406         if (IS_ERR(osd_seq)) {
407                 CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd),
408                        PFID(fid));
409                 return PTR_ERR(osd_seq);
410         }
411
412         if (fid_is_last_id(fid)) {
413                 id = 0;
414         } else {
415                 rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid);
416                 LASSERT(rc == 0); /* we should not get here with IGIF */
417                 id = ostid_id(&osd_oti_get(env)->oti_ostid);
418         }
419
420         b = id % OSD_OST_MAP_SIZE;
421         LASSERT(osd_seq->os_compat_dirs[b]);
422
423         if (buf)
424                 snprintf(buf, bufsize, "%llu", id);
425
426         return osd_seq->os_compat_dirs[b];
427 }
428
429 /*
430  * objects w/o a natural reference (unlike a file on a MDS)
431  * are put under a special hierarchy /O/<seq>/d0..dXX
432  * this function returns a directory specific fid belongs to
433  */
434 static uint64_t
435 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
436                         const struct lu_fid *fid, char *buf, int bufsize)
437 {
438         struct osd_seq  *osd_seq;
439         unsigned long   b;
440         u64             id;
441         int             rc;
442
443         osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid));
444         if (IS_ERR(osd_seq)) {
445                 CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd),
446                        PFID(fid));
447                 return PTR_ERR(osd_seq);
448         }
449
450         if (fid_is_last_id(fid)) {
451                 if (buf)
452                         snprintf(buf, bufsize, "LAST_ID");
453
454                 return osd_seq->os_oid;
455         }
456
457         rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid);
458         LASSERT(rc == 0); /* we should not get here with IGIF */
459
460         id = ostid_id(&osd_oti_get(env)->oti_ostid);
461         b = id % OSD_OST_MAP_SIZE;
462         LASSERT(osd_seq->os_compat_dirs[b]);
463
464         if (buf)
465                 snprintf(buf, bufsize, "%llu", id);
466
467         return osd_seq->os_compat_dirs[b];
468 }
469
470 /* XXX: f_ver is not counted, but may differ too */
471 static void osd_fid2str(char *buf, const struct lu_fid *fid)
472 {
473         sprintf(buf, DFID_NOBRACE, PFID(fid));
474 }
475
476 /*
477  * Determine the zap object id which is being used as the OI for the
478  * given fid.  The lowest N bits in the sequence ID are used as the
479  * index key.  On failure 0 is returned which zfs treats internally
480  * as an invalid object id.
481  */
482 static uint64_t
483 osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
484                     char *buf, dnode_t **zdn)
485 {
486         struct osd_oi *oi;
487
488         LASSERT(osd->od_oi_table != NULL);
489         oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
490         if (buf)
491                 osd_fid2str(buf, fid);
492         if (zdn)
493                 *zdn = oi->oi_dn;
494
495         return oi->oi_zapid;
496 }
497
498 uint64_t
499 osd_get_name_n_idx_compat(const struct lu_env *env, struct osd_device *osd,
500                           const struct lu_fid *fid, char *buf, int bufsize,
501                           dnode_t **zdn)
502 {
503         uint64_t zapid;
504
505         LASSERT(fid);
506         LASSERT(!fid_is_acct(fid));
507
508         if (zdn != NULL)
509                 *zdn = NULL;
510
511         if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) {
512                 zapid = osd_get_idx_for_ost_obj_compat(env, osd, fid,
513                                                        buf, bufsize);
514         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
515                 /* special objects with fixed known fids get their name */
516                 char *name = oid2name(fid_oid(fid));
517
518                 if (name) {
519                         zapid = osd->od_root;
520                         if (buf)
521                                 strncpy(buf, name, bufsize);
522                 } else {
523                         zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
524                 }
525         } else {
526                 zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
527         }
528
529         return zapid;
530 }
531
532 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
533                             const struct lu_fid *fid, char *buf, int bufsize,
534                             dnode_t **zdn)
535 {
536         uint64_t zapid;
537
538         LASSERT(fid);
539         LASSERT(!fid_is_acct(fid));
540
541         if (zdn != NULL)
542                 *zdn = NULL;
543
544         if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO ||
545             fid_is_last_id(fid)) {
546                 zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize);
547         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
548                 /* special objects with fixed known fids get their name */
549                 char *name = oid2name(fid_oid(fid));
550
551                 if (name) {
552                         zapid = osd->od_root;
553                         if (buf)
554                                 strncpy(buf, name, bufsize);
555                 } else {
556                         zapid = osd_get_idx_for_fid(osd, fid, buf, NULL);
557                 }
558         } else {
559                 zapid = osd_get_idx_for_fid(osd, fid, buf, zdn);
560         }
561
562         return zapid;
563 }
564
565 static inline int fid_is_fs_root(const struct lu_fid *fid)
566 {
567         /* Map root inode to special local object FID */
568         return fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
569                 fid_oid(fid) == OSD_FS_ROOT_OID;
570 }
571
572 int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
573                    const struct lu_fid *fid, uint64_t *oid)
574 {
575         struct osd_thread_info  *info = osd_oti_get(env);
576         char                    *buf = info->oti_buf;
577         dnode_t *zdn;
578         uint64_t zapid;
579         int                     rc = 0;
580         ENTRY;
581
582         if (OBD_FAIL_CHECK(OBD_FAIL_SRV_ENOENT))
583                 RETURN(-ENOENT);
584
585         LASSERT(!fid_is_acct(fid));
586
587         if (unlikely(fid_is_fs_root(fid))) {
588                 *oid = dev->od_root;
589         } else {
590                 zapid = osd_get_name_n_idx(env, dev, fid, buf,
591                                            sizeof(info->oti_buf), &zdn);
592                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
593                                     8, 1, &info->oti_zde);
594                 if (rc == -ENOENT) {
595                         if (unlikely(fid_is_last_id(fid))) {
596                                 zapid = osd_get_name_n_idx_compat(env, dev, fid,
597                                         buf, sizeof(info->oti_buf), &zdn);
598                                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
599                                                     8, 1, &info->oti_zde);
600                         } else if (fid_is_objseq(fid) || fid_is_batchid(fid)) {
601                                 zapid = osd_get_idx_for_fid(dev, fid,
602                                                             buf, NULL);
603                                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
604                                                     8, 1, &info->oti_zde);
605                         }
606                 }
607
608                 if (rc)
609                         RETURN(rc);
610                 *oid = info->oti_zde.lzd_reg.zde_dnode;
611         }
612
613         if (rc == 0)
614                 osd_dmu_prefetch(dev->od_os, *oid, 0, 0, 0,
615                                  ZIO_PRIORITY_ASYNC_READ);
616
617         RETURN(rc);
618 }
619
620 /**
621  * Close an entry in a specific slot.
622  */
623 static void
624 osd_oi_remove_table(const struct lu_env *env, struct osd_device *o, int key)
625 {
626         struct osd_oi *oi;
627
628         LASSERT(key < o->od_oi_count);
629
630         oi = o->od_oi_table[key];
631         if (oi) {
632                 if (oi->oi_dn)
633                         osd_dnode_rele(oi->oi_dn);
634                 OBD_FREE_PTR(oi);
635                 o->od_oi_table[key] = NULL;
636         }
637 }
638
639 /**
640  * Allocate and open a new entry in the specified unused slot.
641  */
642 static int
643 osd_oi_add_table(const struct lu_env *env, struct osd_device *o,
644                  char *name, int key)
645 {
646         struct osd_oi *oi;
647         int rc;
648
649         LASSERT(key < o->od_oi_count);
650         LASSERT(o->od_oi_table[key] == NULL);
651
652         OBD_ALLOC_PTR(oi);
653         if (oi == NULL)
654                 return -ENOMEM;
655
656         rc = osd_oi_lookup(env, o, o->od_root, name, oi);
657         if (rc) {
658                 OBD_FREE_PTR(oi);
659                 return rc;
660         }
661
662         o->od_oi_table[key] = oi;
663         __osd_obj2dnode(o->od_os, oi->oi_zapid, &oi->oi_dn);
664
665         return 0;
666 }
667
668 /**
669  * Depopulate the OI table.
670  */
671 static void
672 osd_oi_close_table(const struct lu_env *env, struct osd_device *o)
673 {
674         int i;
675
676         for (i = 0; i < o->od_oi_count; i++)
677                 osd_oi_remove_table(env, o, i);
678 }
679
680 /**
681  * Populate the OI table based.
682  */
683 static int
684 osd_oi_open_table(const struct lu_env *env, struct osd_device *o, int count)
685 {
686         char name[16];
687         int  i, rc = 0;
688         ENTRY;
689
690         for (i = 0; i < count; i++) {
691                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
692                 rc = osd_oi_add_table(env, o, name, i);
693                 if (rc) {
694                         osd_oi_close_table(env, o);
695                         break;
696                 }
697         }
698
699         RETURN(rc);
700 }
701
702 /**
703  * Determine if the type and number of OIs used by this file system.
704  */
705 static int
706 osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count)
707 {
708         uint64_t        root_oid = o->od_root;
709         struct osd_oi   oi;
710         char            name[16];
711         int             rc;
712         ENTRY;
713
714         /*
715          * Check for multiple OIs and determine the count.  There is no
716          * gap handling, if an OI is missing the wrong size can be returned.
717          * The only safeguard is that we know the number of OIs must be a
718          * power of two and this is checked for basic sanity.
719          */
720         for (*count = 0; *count < OSD_OI_FID_NR_MAX; (*count)++) {
721                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, *count);
722                 rc = osd_oi_lookup(env, o, root_oid, name, &oi);
723                 if (rc == 0)
724                         continue;
725
726                 if (rc == -ENOENT) {
727                         if (*count == 0)
728                                 break;
729
730                         if ((*count & (*count - 1)) != 0)
731                                 RETURN(-EDOM);
732
733                         RETURN(0);
734                 }
735
736                 RETURN(rc);
737         }
738
739         /*
740          * No OIs exist, this must be a new filesystem.
741          */
742         *count = 0;
743
744         RETURN(0);
745 }
746
747 static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd)
748 {
749         struct osd_seq_list     *osl = &osd->od_seq_list;
750         struct osd_seq          *osd_seq, *tmp;
751
752         write_lock(&osl->osl_seq_list_lock);
753         list_for_each_entry_safe(osd_seq, tmp, &osl->osl_seq_list,
754                                  os_seq_list) {
755                 list_del(&osd_seq->os_seq_list);
756                 OBD_FREE(osd_seq->os_compat_dirs,
757                          sizeof(uint64_t) * osd_seq->os_subdir_count);
758                 OBD_FREE(osd_seq, sizeof(*osd_seq));
759         }
760         write_unlock(&osl->osl_seq_list_lock);
761
762         return;
763 }
764
765 /**
766  * Create /O subdirectory to map legacy OST objects for compatibility.
767  */
768 static int
769 osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
770 {
771         uint64_t sdb;
772         int rc;
773         ENTRY;
774
775         rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb);
776         if (!rc)
777                 o->od_O_id = sdb;
778
779         RETURN(rc);
780 }
781
782 /**
783  * Initialize the OIs by either opening or creating them as needed.
784  */
785 int osd_oi_init(const struct lu_env *env, struct osd_device *o)
786 {
787         char    *key = osd_oti_get(env)->oti_buf;
788         int      i, rc, count = 0;
789         ENTRY;
790
791         rc = osd_oi_probe(env, o, &count);
792         if (rc)
793                 RETURN(rc);
794
795         if (count == 0) {
796                 uint64_t odb, sdb;
797
798                 count = osd_oi_count;
799                 odb = o->od_root;
800
801                 for (i = 0; i < count; i++) {
802                         sprintf(key, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
803                         rc = osd_oi_find_or_create(env, o, odb, key, &sdb);
804                         if (rc)
805                                 RETURN(rc);
806                 }
807         }
808
809         rc = osd_oi_init_compat(env, o);
810         if (rc)
811                 RETURN(rc);
812
813         LASSERT((count & (count - 1)) == 0);
814         o->od_oi_count = count;
815         OBD_ALLOC(o->od_oi_table, sizeof(struct osd_oi *) * count);
816         if (o->od_oi_table == NULL)
817                 RETURN(-ENOMEM);
818
819         rc = osd_oi_open_table(env, o, count);
820         if (rc) {
821                 OBD_FREE(o->od_oi_table, sizeof(struct osd_oi *) * count);
822                 o->od_oi_table = NULL;
823         }
824
825         RETURN(rc);
826 }
827
828 void osd_oi_fini(const struct lu_env *env, struct osd_device *o)
829 {
830         ENTRY;
831
832         osd_ost_seq_fini(env, o);
833
834         if (o->od_oi_table != NULL) {
835                 (void) osd_oi_close_table(env, o);
836                 OBD_FREE(o->od_oi_table,
837                          sizeof(struct osd_oi *) * o->od_oi_count);
838                 o->od_oi_table = NULL;
839                 o->od_oi_count = 0;
840         }
841
842         EXIT;
843 }
844
845 int osd_options_init(void)
846 {
847         /* osd_oi_count - Default number of OIs, 128 works well for ZFS */
848         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
849                 osd_oi_count = OSD_OI_FID_NR;
850
851         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
852                 LCONSOLE_WARN("Round up osd_oi_count %d to power2 %d\n",
853                         osd_oi_count, size_roundup_power2(osd_oi_count));
854                 osd_oi_count = size_roundup_power2(osd_oi_count);
855         }
856
857         return 0;
858 }
859
860 /*
861  * the following set of functions are used to maintain per-thread
862  * cache of FID->ino mapping. this mechanism is used to avoid
863  * expensive LU/OI lookups.
864  */
865 struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
866                                      struct osd_device *osd,
867                                      const struct lu_fid *fid)
868 {
869         struct osd_thread_info *oti = osd_oti_get(env);
870         struct osd_idmap_cache *idc = oti->oti_ins_cache;
871         int i;
872
873         for (i = 0; i < oti->oti_ins_cache_used; i++) {
874                 if (!lu_fid_eq(&idc[i].oic_fid, fid))
875                         continue;
876                 if (idc[i].oic_dev != osd)
877                         continue;
878
879                 return idc + i;
880         }
881
882         return NULL;
883 }
884
885 struct osd_idmap_cache *osd_idc_add(const struct lu_env *env,
886                                     struct osd_device *osd,
887                                     const struct lu_fid *fid)
888 {
889         struct osd_thread_info *oti = osd_oti_get(env);
890         struct osd_idmap_cache *idc;
891         int i;
892
893         if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
894                 i = oti->oti_ins_cache_size * 2;
895                 LASSERT(i < 1000);
896                 if (i == 0)
897                         i = OSD_INS_CACHE_SIZE;
898                 OBD_ALLOC(idc, sizeof(*idc) * i);
899                 if (idc == NULL)
900                         return ERR_PTR(-ENOMEM);
901                 if (oti->oti_ins_cache != NULL) {
902                         memcpy(idc, oti->oti_ins_cache,
903                                oti->oti_ins_cache_used * sizeof(*idc));
904                         OBD_FREE(oti->oti_ins_cache,
905                                  oti->oti_ins_cache_used * sizeof(*idc));
906                 }
907                 oti->oti_ins_cache = idc;
908                 oti->oti_ins_cache_size = i;
909         }
910
911         idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++];
912         idc->oic_fid = *fid;
913         idc->oic_dev = osd;
914         idc->oic_dnode = 0;
915         idc->oic_remote = 0;
916
917         return idc;
918 }
919
920 /**
921  * Lookup mapping for the given fid in the cache
922  *
923  * Initialize a new one if not found. the initialization checks whether
924  * the object is local or remote. for the local objects, OI is used to
925  * learn dnode#. the function is used when the caller has no information
926  * about the object, e.g. at dt_insert().
927  */
928 struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
929                                              struct osd_device *osd,
930                                              const struct lu_fid *fid)
931 {
932         struct osd_idmap_cache *idc;
933         int rc;
934
935         LASSERT(!fid_is_acct(fid));
936
937         idc = osd_idc_find(env, osd, fid);
938         if (idc != NULL)
939                 return idc;
940
941         /* new mapping is needed */
942         idc = osd_idc_add(env, osd, fid);
943         if (IS_ERR(idc))
944                 return idc;
945
946         /* initialize it */
947         rc = osd_remote_fid(env, osd, fid);
948         if (unlikely(rc < 0))
949                 return ERR_PTR(rc);
950
951         if (rc == 0) {
952                 /* the object is local, lookup in OI */
953                 uint64_t dnode;
954
955                 rc = osd_fid_lookup(env, osd, fid, &dnode);
956                 if (unlikely(rc < 0)) {
957                         CERROR("%s: can't lookup: rc = %d\n",
958                                osd->od_svname, rc);
959                         return ERR_PTR(rc);
960                 }
961                 LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT));
962                 idc->oic_dnode = dnode;
963         } else {
964                 /* the object is remote */
965                 idc->oic_remote = 1;
966         }
967
968         return idc;
969 }
970
971 /*
972  * lookup mapping for given FID and fill it from the given object.
973  * the object is local by definition.
974  */
975 int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
976                           struct osd_object *obj)
977 {
978         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
979         struct osd_idmap_cache *idc;
980
981         idc = osd_idc_find(env, osd, fid);
982         if (idc != NULL) {
983                 if (obj->oo_dn == NULL)
984                         return 0;
985                 idc->oic_dnode = obj->oo_dn->dn_object;
986                 return 0;
987         }
988
989         /* new mapping is needed */
990         idc = osd_idc_add(env, osd, fid);
991         if (IS_ERR(idc))
992                 return PTR_ERR(idc);
993
994         if (obj->oo_dn)
995                 idc->oic_dnode = obj->oo_dn->dn_object;
996
997         return 0;
998 }