Whamcloud - gitweb
LU-10192 osd-zfs: create agent entry for remote entry
[fs/lustre-release.git] / lustre / osd-zfs / osd_oi.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd-zfs/osd_oi.c
33  * OI functions to map fid to dnode
34  *
35  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
36  * Author: Mike Pershin <tappro@whamcloud.com>
37  * Author: Di Wang <di.wang@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_OSD
41
42 #include <libcfs/libcfs.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <obd.h>
46 #include <obd_class.h>
47 #include <lustre_disk.h>
48 #include <lustre_fid.h>
49
50 #include "osd_internal.h"
51
52 #include <sys/dnode.h>
53 #include <sys/dbuf.h>
54 #include <sys/spa.h>
55 #include <sys/stat.h>
56 #include <sys/zap.h>
57 #include <sys/spa_impl.h>
58 #include <sys/zfs_znode.h>
59 #include <sys/dmu_tx.h>
60 #include <sys/dmu_objset.h>
61 #include <sys/dsl_prop.h>
62 #include <sys/sa_impl.h>
63 #include <sys/txg.h>
64 #include <lustre_scrub.h>
65
66 #define OSD_OI_FID_NR         (1UL << 7)
67 unsigned int osd_oi_count = OSD_OI_FID_NR;
68
69
70 /*
71  * zfs osd maintains names for known fids in the name hierarchy
72  * so that one can mount filesystem with regular ZFS stack and
73  * access files
74  */
75 struct named_oid {
76         unsigned long    oid;
77         char            *name;
78 };
79
80 static const struct named_oid oids[] = {
81         { .oid = LAST_RECV_OID,        .name = LAST_RCVD },
82         { .oid = OFD_LAST_GROUP_OID,   .name = "LAST_GROUP" },
83         { .oid = LLOG_CATALOGS_OID,    .name = "CATALOGS" },
84         { .oid = MGS_CONFIGS_OID,      /*MOUNT_CONFIGS_DIR*/ },
85         { .oid = FID_SEQ_SRV_OID,      .name = "seq_srv" },
86         { .oid = FID_SEQ_CTL_OID,      .name = "seq_ctl" },
87         { .oid = FLD_INDEX_OID,        .name = "fld" },
88         { .oid = MDD_LOV_OBJ_OID,      .name = LOV_OBJID },
89         { .oid = OFD_HEALTH_CHECK_OID, .name = HEALTH_CHECK },
90         { .oid = REPLY_DATA_OID,       .name = REPLY_DATA },
91         { .oid = MDD_LOV_OBJ_OSEQ,     .name = LOV_OBJSEQ },
92         { .oid = BATCHID_COMMITTED_OID, .name = "BATCHID" },
93         { .oid = 0 }
94 };
95
96 static inline bool fid_is_objseq(const struct lu_fid *fid)
97 {
98         return fid->f_seq == FID_SEQ_LOCAL_FILE &&
99                 fid->f_oid == MDD_LOV_OBJ_OSEQ;
100 }
101
102 static inline bool fid_is_batchid(const struct lu_fid *fid)
103 {
104         return fid->f_seq == FID_SEQ_LOCAL_FILE &&
105                 fid->f_oid == BATCHID_COMMITTED_OID;
106 }
107
108 static char *oid2name(const unsigned long oid)
109 {
110         int i = 0;
111
112         while (oids[i].oid) {
113                 if (oids[i].oid == oid)
114                         return oids[i].name;
115                 i++;
116         }
117         return NULL;
118 }
119
120 /**
121  * Lookup an existing OI by the given name.
122  */
123 static int
124 osd_oi_lookup(const struct lu_env *env, struct osd_device *o,
125               uint64_t parent, const char *name, struct osd_oi *oi)
126 {
127         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
128         int                      rc;
129
130         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
131         if (rc)
132                 return rc;
133
134         rc = strlcpy(oi->oi_name, name, sizeof(oi->oi_name));
135         if (rc >= sizeof(oi->oi_name))
136                 return -E2BIG;
137
138         oi->oi_zapid = zde->zde_dnode;
139
140         return 0;
141 }
142
143 /**
144  * Create a new OI with the given name.
145  */
146 static int
147 osd_oi_create(const struct lu_env *env, struct osd_device *o,
148               uint64_t parent, const char *name, uint64_t *child)
149 {
150         struct zpl_direntry     *zde = &osd_oti_get(env)->oti_zde.lzd_reg;
151         struct lu_attr          *la = &osd_oti_get(env)->oti_la;
152         sa_handle_t             *sa_hdl = NULL;
153         dmu_tx_t                *tx;
154         uint64_t                 oid;
155         int                      rc;
156
157         /* verify it doesn't already exist */
158         rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde);
159         if (rc == 0)
160                 return -EEXIST;
161
162         if (o->od_dt_dev.dd_rdonly)
163                 return -EROFS;
164
165         /* create fid-to-dnode index */
166         tx = dmu_tx_create(o->od_os);
167         if (tx == NULL)
168                 return -ENOMEM;
169
170         dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL);
171         dmu_tx_hold_bonus(tx, parent);
172         dmu_tx_hold_zap(tx, parent, TRUE, name);
173         dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE);
174
175         rc = -dmu_tx_assign(tx, TXG_WAIT);
176         if (rc) {
177                 dmu_tx_abort(tx);
178                 return rc;
179         }
180
181         oid = osd_zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64,
182                                    DMU_OT_DIRECTORY_CONTENTS,
183                                    14, /* == ZFS fzap_default_block_shift */
184                                    DN_MAX_INDBLKSHIFT,
185                                    0, tx);
186
187         rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl);
188         if (rc)
189                 goto commit;
190         memset(la, 0, sizeof(*la));
191         la->la_valid = LA_MODE | LA_UID | LA_GID;
192         la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
193         rc = __osd_attr_init(env, o, NULL, sa_hdl, tx, la, parent, NULL);
194         sa_handle_destroy(sa_hdl);
195         if (rc)
196                 goto commit;
197
198         zde->zde_dnode = oid;
199         zde->zde_pad = 0;
200         zde->zde_type = IFTODT(S_IFDIR);
201
202         rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx);
203
204 commit:
205         if (rc)
206                 dmu_object_free(o->od_os, oid, tx);
207         dmu_tx_commit(tx);
208
209         if (rc == 0)
210                 *child = oid;
211
212         return rc;
213 }
214
215 static int
216 osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o,
217                       uint64_t parent, const char *name, uint64_t *child)
218 {
219         struct osd_oi   oi;
220         int             rc;
221
222         rc = osd_oi_lookup(env, o, parent, name, &oi);
223         if (rc == 0)
224                 *child = oi.oi_zapid;
225         else if (rc == -ENOENT)
226                 rc = osd_oi_create(env, o, parent, name, child);
227
228         return rc;
229 }
230
231 /**
232  * Lookup the target index/flags of the fid, so it will know where
233  * the object is located (tgt index) and it is MDT or OST object.
234  */
235 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
236                    u64 seq, struct lu_seq_range *range)
237 {
238         struct seq_server_site  *ss = osd_seq_site(osd);
239
240         if (fid_seq_is_idif(seq)) {
241                 fld_range_set_ost(range);
242                 range->lsr_index = idif_ost_idx(seq);
243                 return 0;
244         }
245
246         if (!fid_seq_in_fldb(seq)) {
247                 fld_range_set_mdt(range);
248                 if (ss != NULL)
249                         /* FIXME: If ss is NULL, it suppose not get lsr_index
250                          * at all */
251                         range->lsr_index = ss->ss_node_id;
252                 return 0;
253         }
254
255         LASSERT(ss != NULL);
256         fld_range_set_any(range);
257         /* OSD will only do local fld lookup */
258         return fld_local_lookup(env, ss->ss_server_fld, seq, range);
259 }
260
261 int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
262                   const struct lu_fid *fid)
263 {
264         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
265         int                     rc;
266         ENTRY;
267
268         if (fid_is_idif(fid))
269                 RETURN(1);
270
271         if (unlikely(fid_is_local_file(fid) || fid_is_llog(fid)) ||
272                      fid_is_name_llog(fid) || fid_is_quota(fid))
273                 RETURN(0);
274
275         rc = osd_fld_lookup(env, osd, fid_seq(fid), range);
276         if (rc != 0) {
277                 /* During upgrade, OST FLDB might not be loaded because
278                  * OST FLDB is not created until 2.6, so if some DNE
279                  * filesystem upgrade from 2.5 to 2.7/2.8, they will
280                  * not be able to find the sequence from local FLDB
281                  * cache see fld_index_init(). */
282                 if (rc == -ENOENT && osd->od_is_ost)
283                         RETURN(1);
284
285                 if (rc != -ENOENT)
286                         CERROR("%s: "DFID" lookup failed: rc = %d\n",
287                                osd_name(osd), PFID(fid), rc);
288                 RETURN(0);
289         }
290
291         if (fld_range_is_ost(range))
292                 RETURN(1);
293
294         RETURN(0);
295 }
296
297 static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list,
298                                            u64 seq)
299 {
300         struct osd_seq *osd_seq;
301
302         list_for_each_entry(osd_seq, &seq_list->osl_seq_list, os_seq_list) {
303                 if (osd_seq->os_seq == seq)
304                         return osd_seq;
305         }
306         return NULL;
307 }
308
309 static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, u64 seq)
310 {
311         struct osd_seq *osd_seq;
312
313         read_lock(&seq_list->osl_seq_list_lock);
314         osd_seq = osd_seq_find_locked(seq_list, seq);
315         read_unlock(&seq_list->osl_seq_list_lock);
316
317         return osd_seq;
318 }
319
320 static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env,
321                                            struct osd_device *osd, u64 seq)
322 {
323         struct osd_seq_list     *seq_list = &osd->od_seq_list;
324         struct osd_seq          *osd_seq;
325         char                    *key = osd_oti_get(env)->oti_buf;
326         char                    *seq_name = osd_oti_get(env)->oti_str;
327         struct osd_oi           oi;
328         uint64_t                sdb, odb;
329         int                     i;
330         int                     rc = 0;
331         ENTRY;
332
333         osd_seq = osd_seq_find(seq_list, seq);
334         if (osd_seq != NULL)
335                 RETURN(osd_seq);
336
337         down(&seq_list->osl_seq_init_sem);
338         /* Check again, in case some one else already add it
339          * to the list */
340         osd_seq = osd_seq_find(seq_list, seq);
341         if (osd_seq != NULL)
342                 GOTO(out, rc = 0);
343
344         OBD_ALLOC_PTR(osd_seq);
345         if (osd_seq == NULL)
346                 GOTO(out, rc = -ENOMEM);
347
348         INIT_LIST_HEAD(&osd_seq->os_seq_list);
349         osd_seq->os_seq = seq;
350
351         /* Init subdir count to be 32, but each seq can have
352          * different subdir count */
353         osd_seq->os_subdir_count = OSD_OST_MAP_SIZE;
354         OBD_ALLOC(osd_seq->os_compat_dirs,
355                   sizeof(uint64_t) * osd_seq->os_subdir_count);
356         if (osd_seq->os_compat_dirs == NULL)
357                 GOTO(out, rc = -ENOMEM);
358
359         oi.oi_zapid = osd->od_O_id;
360         sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
361                 fid_seq_is_mdt0(seq)) ?  "%llu" : "%llx",
362                 fid_seq_is_idif(seq) ? 0 : seq);
363
364         rc = osd_oi_find_or_create(env, osd, oi.oi_zapid, seq_name, &odb);
365         if (rc != 0) {
366                 CERROR("%s: Can not create %s : rc = %d\n",
367                        osd_name(osd), seq_name, rc);
368                 GOTO(out, rc);
369         }
370
371         osd_seq->os_oid = odb;
372         for (i = 0; i < OSD_OST_MAP_SIZE; i++) {
373                 sprintf(key, "d%d", i);
374                 rc = osd_oi_find_or_create(env, osd, odb, key, &sdb);
375                 if (rc)
376                         GOTO(out, rc);
377                 osd_seq->os_compat_dirs[i] = sdb;
378         }
379
380         write_lock(&seq_list->osl_seq_list_lock);
381         list_add(&osd_seq->os_seq_list, &seq_list->osl_seq_list);
382         write_unlock(&seq_list->osl_seq_list_lock);
383 out:
384         up(&seq_list->osl_seq_init_sem);
385         if (rc != 0) {
386                 if (osd_seq != NULL && osd_seq->os_compat_dirs != NULL)
387                         OBD_FREE(osd_seq->os_compat_dirs,
388                                  sizeof(uint64_t) * osd_seq->os_subdir_count);
389                 if (osd_seq != NULL)
390                         OBD_FREE_PTR(osd_seq);
391                 osd_seq = ERR_PTR(rc);
392         }
393         RETURN(osd_seq);
394 }
395
396 static uint64_t
397 osd_get_idx_for_ost_obj_compat(const struct lu_env *env, struct osd_device *osd,
398                                const struct lu_fid *fid, char *buf, int bufsize)
399 {
400         struct osd_seq  *osd_seq;
401         unsigned long   b;
402         u64             id;
403         int             rc;
404
405         osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid));
406         if (IS_ERR(osd_seq)) {
407                 CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd),
408                        PFID(fid));
409                 return PTR_ERR(osd_seq);
410         }
411
412         if (fid_is_last_id(fid)) {
413                 id = 0;
414         } else {
415                 rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid);
416                 LASSERT(rc == 0); /* we should not get here with IGIF */
417                 id = ostid_id(&osd_oti_get(env)->oti_ostid);
418         }
419
420         b = id % OSD_OST_MAP_SIZE;
421         LASSERT(osd_seq->os_compat_dirs[b]);
422
423         if (buf)
424                 snprintf(buf, bufsize, "%llu", id);
425
426         return osd_seq->os_compat_dirs[b];
427 }
428
429 /*
430  * objects w/o a natural reference (unlike a file on a MDS)
431  * are put under a special hierarchy /O/<seq>/d0..dXX
432  * this function returns a directory specific fid belongs to
433  */
434 static uint64_t
435 osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd,
436                         const struct lu_fid *fid, char *buf, int bufsize)
437 {
438         struct osd_seq  *osd_seq;
439         unsigned long   b;
440         u64             id;
441         int             rc;
442
443         osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid));
444         if (IS_ERR(osd_seq)) {
445                 CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd),
446                        PFID(fid));
447                 return PTR_ERR(osd_seq);
448         }
449
450         if (fid_is_last_id(fid)) {
451                 if (buf)
452                         snprintf(buf, bufsize, "LAST_ID");
453
454                 return osd_seq->os_oid;
455         }
456
457         rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid);
458         LASSERT(rc == 0); /* we should not get here with IGIF */
459
460         id = ostid_id(&osd_oti_get(env)->oti_ostid);
461         b = id % OSD_OST_MAP_SIZE;
462         LASSERT(osd_seq->os_compat_dirs[b]);
463
464         if (buf)
465                 snprintf(buf, bufsize, "%llu", id);
466
467         return osd_seq->os_compat_dirs[b];
468 }
469
470 /*
471  * Determine the zap object id which is being used as the OI for the
472  * given fid.  The lowest N bits in the sequence ID are used as the
473  * index key.  On failure 0 is returned which zfs treats internally
474  * as an invalid object id.
475  */
476 static uint64_t
477 osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid,
478                     char *buf, dnode_t **zdn, int bufsize)
479 {
480         struct osd_oi *oi;
481
482         LASSERT(osd->od_oi_table != NULL);
483         oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)];
484         if (buf)
485                 osd_fid2str(buf, fid, bufsize);
486         if (zdn)
487                 *zdn = oi->oi_dn;
488
489         return oi->oi_zapid;
490 }
491
492 uint64_t
493 osd_get_name_n_idx_compat(const struct lu_env *env, struct osd_device *osd,
494                           const struct lu_fid *fid, char *buf, int bufsize,
495                           dnode_t **zdn)
496 {
497         uint64_t zapid;
498
499         LASSERT(fid);
500         LASSERT(!fid_is_acct(fid));
501
502         if (zdn != NULL)
503                 *zdn = NULL;
504
505         if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) {
506                 zapid = osd_get_idx_for_ost_obj_compat(env, osd, fid,
507                                                        buf, bufsize);
508         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
509                 /* special objects with fixed known fids get their name */
510                 char *name = oid2name(fid_oid(fid));
511
512                 if (name) {
513                         zapid = osd->od_root;
514                         if (buf)
515                                 strncpy(buf, name, bufsize);
516                 } else {
517                         zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
518                                                     bufsize);
519                 }
520         } else {
521                 zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
522         }
523
524         return zapid;
525 }
526
527 uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
528                             const struct lu_fid *fid, char *buf, int bufsize,
529                             dnode_t **zdn)
530 {
531         uint64_t zapid;
532
533         LASSERT(fid);
534         LASSERT(!fid_is_acct(fid));
535
536         if (zdn != NULL)
537                 *zdn = NULL;
538
539         if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO ||
540             fid_is_last_id(fid)) {
541                 zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize);
542         } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
543                 /* special objects with fixed known fids get their name */
544                 char *name = oid2name(fid_oid(fid));
545
546                 if (name) {
547                         zapid = osd->od_root;
548                         if (buf)
549                                 strncpy(buf, name, bufsize);
550                 } else {
551                         zapid = osd_get_idx_for_fid(osd, fid, buf, NULL,
552                                                     bufsize);
553                 }
554         } else {
555                 zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize);
556         }
557
558         return zapid;
559 }
560
561 static inline int fid_is_fs_root(const struct lu_fid *fid)
562 {
563         /* Map root inode to special local object FID */
564         return fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
565                 fid_oid(fid) == OSD_FS_ROOT_OID;
566 }
567
568 int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev,
569                    const struct lu_fid *fid, uint64_t *oid)
570 {
571         struct osd_thread_info  *info = osd_oti_get(env);
572         char                    *buf = info->oti_buf;
573         dnode_t *zdn;
574         uint64_t zapid;
575         int                     rc = 0;
576         ENTRY;
577
578         if (OBD_FAIL_CHECK(OBD_FAIL_SRV_ENOENT))
579                 RETURN(-ENOENT);
580
581         LASSERT(!fid_is_acct(fid));
582
583         if (unlikely(fid_is_fs_root(fid))) {
584                 *oid = dev->od_root;
585         } else {
586                 zapid = osd_get_name_n_idx(env, dev, fid, buf,
587                                            sizeof(info->oti_buf), &zdn);
588                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
589                                     8, 1, &info->oti_zde);
590                 if (rc == -ENOENT) {
591                         if (unlikely(fid_is_last_id(fid))) {
592                                 zapid = osd_get_name_n_idx_compat(env, dev, fid,
593                                         buf, sizeof(info->oti_buf), &zdn);
594                                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
595                                                     8, 1, &info->oti_zde);
596                         } else if (fid_is_objseq(fid) || fid_is_batchid(fid)) {
597                                 zapid = osd_get_idx_for_fid(dev, fid,
598                                         buf, NULL, sizeof(info->oti_buf));
599                                 rc = osd_zap_lookup(dev, zapid, zdn, buf,
600                                                     8, 1, &info->oti_zde);
601                         }
602                 }
603
604                 if (rc)
605                         RETURN(rc);
606                 *oid = info->oti_zde.lzd_reg.zde_dnode;
607         }
608
609         if (rc == 0)
610                 osd_dmu_prefetch(dev->od_os, *oid, 0, 0, 0,
611                                  ZIO_PRIORITY_ASYNC_READ);
612
613         RETURN(rc);
614 }
615
616 /**
617  * Close an entry in a specific slot.
618  */
619 static void
620 osd_oi_remove_table(const struct lu_env *env, struct osd_device *o, int key)
621 {
622         struct osd_oi *oi;
623
624         LASSERT(key < o->od_oi_count);
625
626         oi = o->od_oi_table[key];
627         if (oi) {
628                 if (oi->oi_dn)
629                         osd_dnode_rele(oi->oi_dn);
630                 OBD_FREE_PTR(oi);
631                 o->od_oi_table[key] = NULL;
632         }
633 }
634
635 /**
636  * Allocate and open a new entry in the specified unused slot.
637  */
638 static int
639 osd_oi_add_table(const struct lu_env *env, struct osd_device *o,
640                  char *name, int key)
641 {
642         struct osd_oi *oi;
643         int rc;
644
645         LASSERT(key < o->od_oi_count);
646         LASSERT(o->od_oi_table[key] == NULL);
647
648         OBD_ALLOC_PTR(oi);
649         if (oi == NULL)
650                 return -ENOMEM;
651
652         rc = osd_oi_lookup(env, o, o->od_root, name, oi);
653         if (rc) {
654                 OBD_FREE_PTR(oi);
655                 return rc;
656         }
657
658         o->od_oi_table[key] = oi;
659         __osd_obj2dnode(o->od_os, oi->oi_zapid, &oi->oi_dn);
660
661         return 0;
662 }
663
664 /**
665  * Depopulate the OI table.
666  */
667 static void
668 osd_oi_close_table(const struct lu_env *env, struct osd_device *o)
669 {
670         int i;
671
672         for (i = 0; i < o->od_oi_count; i++)
673                 osd_oi_remove_table(env, o, i);
674 }
675
676 /**
677  * Populate the OI table based.
678  */
679 static int
680 osd_oi_open_table(const struct lu_env *env, struct osd_device *o, int count)
681 {
682         char name[16];
683         int  i, rc = 0;
684         ENTRY;
685
686         for (i = 0; i < count; i++) {
687                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
688                 rc = osd_oi_add_table(env, o, name, i);
689                 if (rc) {
690                         osd_oi_close_table(env, o);
691                         break;
692                 }
693         }
694
695         RETURN(rc);
696 }
697
698 /**
699  * Determine if the type and number of OIs used by this file system.
700  */
701 static int
702 osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count)
703 {
704         uint64_t        root_oid = o->od_root;
705         struct osd_oi   oi;
706         char            name[16];
707         int             rc;
708         ENTRY;
709
710         /*
711          * Check for multiple OIs and determine the count.  There is no
712          * gap handling, if an OI is missing the wrong size can be returned.
713          * The only safeguard is that we know the number of OIs must be a
714          * power of two and this is checked for basic sanity.
715          */
716         for (*count = 0; *count < OSD_OI_FID_NR_MAX; (*count)++) {
717                 sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, *count);
718                 rc = osd_oi_lookup(env, o, root_oid, name, &oi);
719                 if (rc == 0)
720                         continue;
721
722                 if (rc == -ENOENT) {
723                         if (*count == 0)
724                                 break;
725
726                         if ((*count & (*count - 1)) != 0)
727                                 RETURN(-EDOM);
728
729                         RETURN(0);
730                 }
731
732                 RETURN(rc);
733         }
734
735         /*
736          * No OIs exist, this must be a new filesystem.
737          */
738         *count = 0;
739
740         RETURN(0);
741 }
742
743 static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd)
744 {
745         struct osd_seq_list     *osl = &osd->od_seq_list;
746         struct osd_seq          *osd_seq, *tmp;
747
748         write_lock(&osl->osl_seq_list_lock);
749         list_for_each_entry_safe(osd_seq, tmp, &osl->osl_seq_list,
750                                  os_seq_list) {
751                 list_del(&osd_seq->os_seq_list);
752                 OBD_FREE(osd_seq->os_compat_dirs,
753                          sizeof(uint64_t) * osd_seq->os_subdir_count);
754                 OBD_FREE(osd_seq, sizeof(*osd_seq));
755         }
756         write_unlock(&osl->osl_seq_list_lock);
757
758         return;
759 }
760
761 /**
762  * Create /O subdirectory to map legacy OST objects for compatibility.
763  */
764 static int
765 osd_oi_init_compat(const struct lu_env *env, struct osd_device *o)
766 {
767         uint64_t sdb;
768         int rc;
769         ENTRY;
770
771         rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb);
772         if (!rc)
773                 o->od_O_id = sdb;
774
775         RETURN(rc);
776 }
777
778 static void
779 osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
780 {
781         uint64_t sdb;
782         int rc;
783         ENTRY;
784
785         if (o->od_is_ost) {
786                 o->od_remote_parent_dir = ZFS_NO_OBJECT;
787         } else {
788                 /* Remote parent only used for cross-MDT objects,
789                  * it is usless for single MDT case or under read
790                  * only mode. So ignore the failure. */
791                 rc = osd_oi_find_or_create(env, o, o->od_root,
792                                            REMOTE_PARENT_DIR, &sdb);
793                 if (!rc)
794                         o->od_remote_parent_dir = sdb;
795                 else
796                         o->od_remote_parent_dir = ZFS_NO_OBJECT;
797         }
798 }
799
800 /**
801  * Initialize the OIs by either opening or creating them as needed.
802  */
803 int osd_oi_init(const struct lu_env *env, struct osd_device *o)
804 {
805         char    *key = osd_oti_get(env)->oti_buf;
806         int      i, rc, count = 0;
807         ENTRY;
808
809         osd_oi_init_remote_parent(env, o);
810
811         rc = osd_oi_probe(env, o, &count);
812         if (rc)
813                 RETURN(rc);
814
815         if (count == 0) {
816                 uint64_t odb, sdb;
817
818                 count = osd_oi_count;
819                 odb = o->od_root;
820
821                 for (i = 0; i < count; i++) {
822                         sprintf(key, "%s.%d", DMU_OSD_OI_NAME_BASE, i);
823                         rc = osd_oi_find_or_create(env, o, odb, key, &sdb);
824                         if (rc)
825                                 RETURN(rc);
826                 }
827         }
828
829         rc = osd_oi_init_compat(env, o);
830         if (rc)
831                 RETURN(rc);
832
833         LASSERT((count & (count - 1)) == 0);
834         o->od_oi_count = count;
835         OBD_ALLOC(o->od_oi_table, sizeof(struct osd_oi *) * count);
836         if (o->od_oi_table == NULL)
837                 RETURN(-ENOMEM);
838
839         rc = osd_oi_open_table(env, o, count);
840         if (rc) {
841                 OBD_FREE(o->od_oi_table, sizeof(struct osd_oi *) * count);
842                 o->od_oi_table = NULL;
843         }
844
845         RETURN(rc);
846 }
847
848 void osd_oi_fini(const struct lu_env *env, struct osd_device *o)
849 {
850         ENTRY;
851
852         osd_ost_seq_fini(env, o);
853
854         if (o->od_oi_table != NULL) {
855                 (void) osd_oi_close_table(env, o);
856                 OBD_FREE(o->od_oi_table,
857                          sizeof(struct osd_oi *) * o->od_oi_count);
858                 o->od_oi_table = NULL;
859                 o->od_oi_count = 0;
860         }
861
862         EXIT;
863 }
864
865 int osd_options_init(void)
866 {
867         /* osd_oi_count - Default number of OIs, 128 works well for ZFS */
868         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
869                 osd_oi_count = OSD_OI_FID_NR;
870
871         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
872                 LCONSOLE_WARN("Round up osd_oi_count %d to power2 %d\n",
873                         osd_oi_count, size_roundup_power2(osd_oi_count));
874                 osd_oi_count = size_roundup_power2(osd_oi_count);
875         }
876
877         return 0;
878 }
879
880 /*
881  * the following set of functions are used to maintain per-thread
882  * cache of FID->ino mapping. this mechanism is used to avoid
883  * expensive LU/OI lookups.
884  */
885 struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
886                                      struct osd_device *osd,
887                                      const struct lu_fid *fid)
888 {
889         struct osd_thread_info *oti = osd_oti_get(env);
890         struct osd_idmap_cache *idc = oti->oti_ins_cache;
891         int i;
892
893         for (i = 0; i < oti->oti_ins_cache_used; i++) {
894                 if (!lu_fid_eq(&idc[i].oic_fid, fid))
895                         continue;
896                 if (idc[i].oic_dev != osd)
897                         continue;
898
899                 return idc + i;
900         }
901
902         return NULL;
903 }
904
905 struct osd_idmap_cache *osd_idc_add(const struct lu_env *env,
906                                     struct osd_device *osd,
907                                     const struct lu_fid *fid)
908 {
909         struct osd_thread_info *oti = osd_oti_get(env);
910         struct osd_idmap_cache *idc;
911         int i;
912
913         if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
914                 i = oti->oti_ins_cache_size * 2;
915                 LASSERT(i < 1000);
916                 if (i == 0)
917                         i = OSD_INS_CACHE_SIZE;
918                 OBD_ALLOC(idc, sizeof(*idc) * i);
919                 if (idc == NULL)
920                         return ERR_PTR(-ENOMEM);
921                 if (oti->oti_ins_cache != NULL) {
922                         memcpy(idc, oti->oti_ins_cache,
923                                oti->oti_ins_cache_used * sizeof(*idc));
924                         OBD_FREE(oti->oti_ins_cache,
925                                  oti->oti_ins_cache_used * sizeof(*idc));
926                 }
927                 oti->oti_ins_cache = idc;
928                 oti->oti_ins_cache_size = i;
929         }
930
931         idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++];
932         idc->oic_fid = *fid;
933         idc->oic_dev = osd;
934         idc->oic_dnode = 0;
935         idc->oic_remote = 0;
936
937         return idc;
938 }
939
940 /**
941  * Lookup mapping for the given fid in the cache
942  *
943  * Initialize a new one if not found. the initialization checks whether
944  * the object is local or remote. for the local objects, OI is used to
945  * learn dnode#. the function is used when the caller has no information
946  * about the object, e.g. at dt_insert().
947  */
948 struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
949                                              struct osd_device *osd,
950                                              const struct lu_fid *fid)
951 {
952         struct osd_idmap_cache *idc;
953         int rc;
954
955         LASSERT(!fid_is_acct(fid));
956
957         idc = osd_idc_find(env, osd, fid);
958         if (idc != NULL)
959                 return idc;
960
961         /* new mapping is needed */
962         idc = osd_idc_add(env, osd, fid);
963         if (IS_ERR(idc))
964                 return idc;
965
966         /* initialize it */
967         rc = osd_remote_fid(env, osd, fid);
968         if (unlikely(rc < 0))
969                 return ERR_PTR(rc);
970
971         if (rc == 0) {
972                 /* the object is local, lookup in OI */
973                 uint64_t dnode;
974
975                 rc = osd_fid_lookup(env, osd, fid, &dnode);
976                 if (unlikely(rc < 0)) {
977                         CERROR("%s: can't lookup: rc = %d\n",
978                                osd->od_svname, rc);
979                         return ERR_PTR(rc);
980                 }
981                 LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT));
982                 idc->oic_dnode = dnode;
983         } else {
984                 /* the object is remote */
985                 idc->oic_remote = 1;
986         }
987
988         return idc;
989 }
990
991 /*
992  * lookup mapping for given FID and fill it from the given object.
993  * the object is local by definition.
994  */
995 int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
996                           struct osd_object *obj)
997 {
998         const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
999         struct osd_idmap_cache *idc;
1000
1001         idc = osd_idc_find(env, osd, fid);
1002         if (idc != NULL) {
1003                 if (obj->oo_dn == NULL)
1004                         return 0;
1005                 idc->oic_dnode = obj->oo_dn->dn_object;
1006                 return 0;
1007         }
1008
1009         /* new mapping is needed */
1010         idc = osd_idc_add(env, osd, fid);
1011         if (IS_ERR(idc))
1012                 return PTR_ERR(idc);
1013
1014         if (obj->oo_dn)
1015                 idc->oic_dnode = obj->oo_dn->dn_object;
1016
1017         return 0;
1018 }