X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;ds=sidebyside;f=lustre%2Fosd-zfs%2Fosd_oi.c;h=af618f07057d9e05d1e9f7b123be90c41f538868;hb=77f5bb4dace53e0040dea4ce5c72156e901e6819;hp=371c442e5468efa878622bb7eb4f0eaea2d87976;hpb=745c19c70319915a55b71b81b4e89d68e3a4e272;p=fs%2Flustre-release.git diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 371c442..af618f0 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -15,21 +15,15 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. - */ -/* - * Copyright (c) 2012, 2013, Intel Corporation. - * Use is subject to license terms. + * + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -45,7 +39,6 @@ #define DEBUG_SUBSYSTEM S_OSD -#include #include #include #include @@ -68,9 +61,9 @@ #include #include #include +#include #define OSD_OI_FID_NR (1UL << 7) -#define OSD_OI_FID_NR_MAX (1UL << OSD_OI_FID_OID_BITS_MAX) unsigned int osd_oi_count = OSD_OI_FID_NR; @@ -85,20 +78,33 @@ struct named_oid { }; static const struct named_oid oids[] = { - { LAST_RECV_OID, LAST_RCVD }, - { OFD_LAST_GROUP_OID, "LAST_GROUP" }, - { LLOG_CATALOGS_OID, "CATALOGS" }, - { MGS_CONFIGS_OID, NULL /*MOUNT_CONFIGS_DIR*/ }, - { FID_SEQ_SRV_OID, "seq_srv" }, - { FID_SEQ_CTL_OID, "seq_ctl" }, - { FLD_INDEX_OID, "fld" }, - { MDD_LOV_OBJ_OID, LOV_OBJID }, - { OFD_HEALTH_CHECK_OID, HEALTH_CHECK }, - { ACCT_USER_OID, "acct_usr_inode" }, - { ACCT_GROUP_OID, "acct_grp_inode" }, - { 0, NULL } + { .oid = LAST_RECV_OID, .name = LAST_RCVD }, + { .oid = OFD_LAST_GROUP_OID, .name = "LAST_GROUP" }, + { .oid = LLOG_CATALOGS_OID, .name = "CATALOGS" }, + { .oid = MGS_CONFIGS_OID, /*MOUNT_CONFIGS_DIR*/ }, + { .oid = FID_SEQ_SRV_OID, .name = "seq_srv" }, + { .oid = FID_SEQ_CTL_OID, .name = "seq_ctl" }, + { .oid = FLD_INDEX_OID, .name = "fld" }, + { .oid = MDD_LOV_OBJ_OID, .name = LOV_OBJID }, + { .oid = OFD_HEALTH_CHECK_OID, .name = HEALTH_CHECK }, + { .oid = REPLY_DATA_OID, .name = REPLY_DATA }, + { .oid = MDD_LOV_OBJ_OSEQ, .name = LOV_OBJSEQ }, + { .oid = BATCHID_COMMITTED_OID, .name = "BATCHID" }, + { .oid = 0 } }; +static inline bool fid_is_objseq(const struct lu_fid *fid) +{ + return fid->f_seq == FID_SEQ_LOCAL_FILE && + fid->f_oid == MDD_LOV_OBJ_OSEQ; +} + +static inline bool fid_is_batchid(const struct lu_fid *fid) +{ + return fid->f_seq == FID_SEQ_LOCAL_FILE && + fid->f_oid == BATCHID_COMMITTED_OID; +} + static char *oid2name(const unsigned long oid) { int i = 0; @@ -129,63 +135,96 @@ osd_oi_lookup(const struct lu_env *env, struct osd_device *o, if (rc >= sizeof(oi->oi_name)) return -E2BIG; - rc = 0; oi->oi_zapid = zde->zde_dnode; - return rc; + return 0; } -/** - * Create a new OI with the given name. - */ -static int -osd_oi_create(const struct lu_env *env, struct osd_device *o, - uint64_t parent, const char *name, uint64_t *child) +static int osd_obj_create(const struct lu_env *env, struct osd_device *o, + uint64_t parent, const char *name, uint64_t *child, + const struct lu_fid *fid, bool isdir) { - struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; - struct lu_attr *la = &osd_oti_get(env)->oti_la; - dmu_buf_t *db; - dmu_tx_t *tx; - int rc; + struct osd_thread_info *info = osd_oti_get(env); + struct zpl_direntry *zde = &info->oti_zde.lzd_reg; + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct lu_attr *la = &info->oti_la; + sa_handle_t *sa_hdl = NULL; + nvlist_t *nvbuf = NULL; + dmu_tx_t *tx; + uint64_t oid; + __u32 compat = LMAC_NOT_IN_OI; + int rc; + ENTRY; - /* verify it doesn't already exist */ - rc = -zap_lookup(o->od_os, parent, name, 8, 1, (void *)zde); - if (rc == 0) - return -EEXIST; + if (o->od_dt_dev.dd_rdonly) + RETURN(-EROFS); + + memset(la, 0, sizeof(*la)); + la->la_valid = LA_MODE | LA_UID | LA_GID; + la->la_mode = S_IRUGO | S_IWUSR | (isdir ? S_IXUGO | S_IFDIR : S_IFREG); + + if (fid) { + rc = -nvlist_alloc(&nvbuf, NV_UNIQUE_NAME, KM_SLEEP); + if (rc) + RETURN(rc); + + if (o->od_is_ost) + compat |= LMAC_FID_ON_OST; + lustre_lma_init(lma, fid, compat, 0); + lustre_lma_swab(lma); + rc = -nvlist_add_byte_array(nvbuf, XATTR_NAME_LMA, + (uchar_t *)lma, sizeof(*lma)); + if (rc) + GOTO(out, rc); + } /* create fid-to-dnode index */ tx = dmu_tx_create(o->od_os); - if (tx == NULL) - return -ENOMEM; + if (!tx) + GOTO(out, rc = -ENOMEM); - dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL); + dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, FALSE, NULL); dmu_tx_hold_bonus(tx, parent); dmu_tx_hold_zap(tx, parent, TRUE, name); - LASSERT(tx->tx_objset->os_sa); dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE); - rc = -dmu_tx_assign(tx, TXG_WAIT); if (rc) { dmu_tx_abort(tx); - return rc; + GOTO(out, rc); } - la->la_valid = LA_MODE | LA_UID | LA_GID; - la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; - la->la_uid = la->la_gid = 0; - __osd_zap_create(env, o, &db, tx, la, parent, 0); + if (isdir) + oid = osd_zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64, + DMU_OT_DIRECTORY_CONTENTS, + 14, DN_MAX_INDBLKSHIFT, 0, tx); + else + oid = osd_dmu_object_alloc(o->od_os, DMU_OTN_UINT8_METADATA, + 0, 0, tx); + rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc) + GOTO(commit, rc); - zde->zde_dnode = db->db_object; - zde->zde_pad = 0; - zde->zde_type = IFTODT(S_IFDIR); + rc = __osd_attr_init(env, o, NULL, sa_hdl, tx, la, parent, nvbuf); + sa_handle_destroy(sa_hdl); + if (rc) + GOTO(commit, rc); + zde->zde_dnode = oid; + zde->zde_pad = 0; + zde->zde_type = S_DT(isdir ? S_IFDIR : S_IFREG); rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx); - dmu_tx_commit(tx); - - *child = db->db_object; - sa_buf_rele(db, osd_obj_tag); + GOTO(commit, rc); +commit: + if (rc) + dmu_object_free(o->od_os, oid, tx); + else + *child = oid; + dmu_tx_commit(tx); +out: + if (nvbuf) + nvlist_free(nvbuf); return rc; } @@ -200,7 +239,23 @@ osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o, if (rc == 0) *child = oi.oi_zapid; else if (rc == -ENOENT) - rc = osd_oi_create(env, o, parent, name, child); + rc = osd_obj_create(env, o, parent, name, child, NULL, true); + + return rc; +} + +int osd_obj_find_or_create(const struct lu_env *env, struct osd_device *o, + uint64_t parent, const char *name, uint64_t *child, + const struct lu_fid *fid, bool isdir) +{ + struct osd_oi oi; + int rc; + + rc = osd_oi_lookup(env, o, parent, name, &oi); + if (!rc) + *child = oi.oi_zapid; + else if (rc == -ENOENT) + rc = osd_obj_create(env, o, parent, name, child, fid, isdir); return rc; } @@ -210,7 +265,7 @@ osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o, * the object is located (tgt index) and it is MDT or OST object. */ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, - obd_seq seq, struct lu_seq_range *range) + u64 seq, struct lu_seq_range *range) { struct seq_server_site *ss = osd_seq_site(osd); @@ -229,7 +284,11 @@ int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, return 0; } - LASSERT(ss != NULL); + /* The seq_server_site may be NOT ready during initial OI scrub */ + if (unlikely(!ss || !ss->ss_server_fld || + !ss->ss_server_fld->lsf_cache)) + return -ENOENT; + fld_range_set_any(range); /* OSD will only do local fld lookup */ return fld_local_lookup(env, ss->ss_server_fld, seq, range); @@ -246,11 +305,20 @@ int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd, RETURN(1); if (unlikely(fid_is_local_file(fid) || fid_is_llog(fid)) || - fid_is_name_llog(fid) || fid_is_quota(fid)) + fid_is_name_llog(fid) || fid_is_quota(fid) || + fid_is_igif(fid)) RETURN(0); rc = osd_fld_lookup(env, osd, fid_seq(fid), range); if (rc != 0) { + /* During upgrade, OST FLDB might not be loaded because + * OST FLDB is not created until 2.6, so if some DNE + * filesystem upgrade from 2.5 to 2.7/2.8, they will + * not be able to find the sequence from local FLDB + * cache see fld_index_init(). */ + if (rc == -ENOENT && osd->od_is_ost) + RETURN(1); + if (rc != -ENOENT) CERROR("%s: "DFID" lookup failed: rc = %d\n", osd_name(osd), PFID(fid), rc); @@ -264,7 +332,7 @@ int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd, } static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list, - obd_seq seq) + u64 seq) { struct osd_seq *osd_seq; @@ -275,8 +343,7 @@ static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list, return NULL; } -static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, - obd_seq seq) +static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, u64 seq) { struct osd_seq *osd_seq; @@ -288,7 +355,7 @@ static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, } static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env, - struct osd_device *osd, obd_seq seq) + struct osd_device *osd, u64 seq) { struct osd_seq_list *seq_list = &osd->od_seq_list; struct osd_seq *osd_seq; @@ -321,14 +388,13 @@ static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env, /* Init subdir count to be 32, but each seq can have * different subdir count */ osd_seq->os_subdir_count = OSD_OST_MAP_SIZE; - OBD_ALLOC(osd_seq->os_compat_dirs, - sizeof(uint64_t) * osd_seq->os_subdir_count); + OBD_ALLOC_PTR_ARRAY(osd_seq->os_compat_dirs, osd_seq->os_subdir_count); if (osd_seq->os_compat_dirs == NULL) GOTO(out, rc = -ENOMEM); oi.oi_zapid = osd->od_O_id; sprintf(seq_name, (fid_seq_is_rsvd(seq) || - fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i, + fid_seq_is_mdt0(seq)) ? "%llu" : "%llx", fid_seq_is_idif(seq) ? 0 : seq); rc = osd_oi_find_or_create(env, osd, oi.oi_zapid, seq_name, &odb); @@ -338,6 +404,7 @@ static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env, GOTO(out, rc); } + osd_seq->os_oid = odb; for (i = 0; i < OSD_OST_MAP_SIZE; i++) { sprintf(key, "d%d", i); rc = osd_oi_find_or_create(env, osd, odb, key, &sdb); @@ -353,8 +420,8 @@ out: up(&seq_list->osl_seq_init_sem); if (rc != 0) { if (osd_seq != NULL && osd_seq->os_compat_dirs != NULL) - OBD_FREE(osd_seq->os_compat_dirs, - sizeof(uint64_t) * osd_seq->os_subdir_count); + OBD_FREE_PTR_ARRAY(osd_seq->os_compat_dirs, + osd_seq->os_subdir_count); if (osd_seq != NULL) OBD_FREE_PTR(osd_seq); osd_seq = ERR_PTR(rc); @@ -362,18 +429,13 @@ out: RETURN(osd_seq); } -/* - * objects w/o a natural reference (unlike a file on a MDS) - * are put under a special hierarchy /O//d0..dXX - * this function returns a directory specific fid belongs to - */ static uint64_t -osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf) +osd_get_idx_for_ost_obj_compat(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid, char *buf, int bufsize) { struct osd_seq *osd_seq; unsigned long b; - obd_id id; + u64 id; int rc; osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid)); @@ -394,15 +456,51 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, b = id % OSD_OST_MAP_SIZE; LASSERT(osd_seq->os_compat_dirs[b]); - sprintf(buf, LPU64, id); + if (buf) + snprintf(buf, bufsize, "%llu", id); return osd_seq->os_compat_dirs[b]; } -/* XXX: f_ver is not counted, but may differ too */ -static void osd_fid2str(char *buf, const struct lu_fid *fid) +/* + * objects w/o a natural reference (unlike a file on a MDS) + * are put under a special hierarchy /O//d0..dXX + * this function returns a directory specific fid belongs to + */ +static uint64_t +osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid, char *buf, int bufsize) { - sprintf(buf, DFID_NOBRACE, PFID(fid)); + struct osd_seq *osd_seq; + unsigned long b; + u64 id; + int rc; + + osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid)); + if (IS_ERR(osd_seq)) { + CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd), + PFID(fid)); + return PTR_ERR(osd_seq); + } + + if (fid_is_last_id(fid)) { + if (buf) + snprintf(buf, bufsize, "LAST_ID"); + + return osd_seq->os_oid; + } + + rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid); + LASSERT(rc == 0); /* we should not get here with IGIF */ + + id = ostid_id(&osd_oti_get(env)->oti_ostid); + b = id % OSD_OST_MAP_SIZE; + LASSERT(osd_seq->os_compat_dirs[b]); + + if (buf) + snprintf(buf, bufsize, "%llu", id); + + return osd_seq->os_compat_dirs[b]; } /* @@ -413,41 +511,83 @@ static void osd_fid2str(char *buf, const struct lu_fid *fid) */ static uint64_t osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid, - char *buf) + char *buf, dnode_t **zdn, int bufsize) { struct osd_oi *oi; - LASSERT(osd->od_oi_table != NULL); - oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)]; - osd_fid2str(buf, fid); + oi = osd_fid2oi(osd, fid); + if (buf) + osd_fid2str(buf, fid, bufsize); + if (zdn) + *zdn = oi->oi_dn; return oi->oi_zapid; } +uint64_t +osd_get_name_n_idx_compat(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid, char *buf, int bufsize, + dnode_t **zdn) +{ + uint64_t zapid; + + LASSERT(fid); + LASSERT(!fid_is_acct(fid)); + + if (zdn != NULL) + *zdn = NULL; + + if (fid_is_echo(fid) || fid_is_on_ost(env, osd, fid)) { + zapid = osd_get_idx_for_ost_obj_compat(env, osd, fid, + buf, bufsize); + } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) { + /* special objects with fixed known fids get their name */ + char *name = oid2name(fid_oid(fid)); + + if (name) { + zapid = osd->od_root; + if (buf) + strncpy(buf, name, bufsize); + } else { + zapid = osd_get_idx_for_fid(osd, fid, buf, NULL, + bufsize); + } + } else { + zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize); + } + + return zapid; +} + uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf) + const struct lu_fid *fid, char *buf, int bufsize, + dnode_t **zdn) { uint64_t zapid; LASSERT(fid); - LASSERT(buf); + LASSERT(!fid_is_acct(fid)); + + if (zdn != NULL) + *zdn = NULL; - if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) { - zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf); + if (fid_is_echo(fid) || fid_is_last_id(fid) || + fid_is_on_ost(env, osd, fid)) { + zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize); } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) { /* special objects with fixed known fids get their name */ char *name = oid2name(fid_oid(fid)); if (name) { zapid = osd->od_root; - strcpy(buf, name); - if (fid_is_acct(fid)) - zapid = MASTER_NODE_OBJ; + if (buf) + strncpy(buf, name, bufsize); } else { - zapid = osd_get_idx_for_fid(osd, fid, buf); + zapid = osd_get_idx_for_fid(osd, fid, buf, NULL, + bufsize); } } else { - zapid = osd_get_idx_for_fid(osd, fid, buf); + zapid = osd_get_idx_for_fid(osd, fid, buf, zdn, bufsize); } return zapid; @@ -465,32 +605,45 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev, { struct osd_thread_info *info = osd_oti_get(env); char *buf = info->oti_buf; - uint64_t zapid; + dnode_t *zdn; + uint64_t zapid; int rc = 0; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) + if (OBD_FAIL_CHECK(OBD_FAIL_SRV_ENOENT)) RETURN(-ENOENT); - if (unlikely(fid_is_acct(fid))) { - if (fid_oid(fid) == ACCT_USER_OID) - *oid = dev->od_iusr_oid; - else - *oid = dev->od_igrp_oid; - } else if (unlikely(fid_is_fs_root(fid))) { + LASSERT(!fid_is_acct(fid)); + + if (unlikely(fid_is_fs_root(fid))) { *oid = dev->od_root; } else { - zapid = osd_get_name_n_idx(env, dev, fid, buf); + zapid = osd_get_name_n_idx(env, dev, fid, buf, + sizeof(info->oti_buf), &zdn); + rc = osd_zap_lookup(dev, zapid, zdn, buf, + 8, 1, &info->oti_zde); + if (rc == -ENOENT) { + if (unlikely(fid_is_last_id(fid))) { + zapid = osd_get_name_n_idx_compat(env, dev, fid, + buf, sizeof(info->oti_buf), &zdn); + rc = osd_zap_lookup(dev, zapid, zdn, buf, + 8, 1, &info->oti_zde); + } else if (fid_is_objseq(fid) || fid_is_batchid(fid)) { + zapid = osd_get_idx_for_fid(dev, fid, + buf, NULL, sizeof(info->oti_buf)); + rc = osd_zap_lookup(dev, zapid, zdn, buf, + 8, 1, &info->oti_zde); + } + } - rc = -zap_lookup(dev->od_os, zapid, buf, - 8, 1, &info->oti_zde); if (rc) RETURN(rc); *oid = info->oti_zde.lzd_reg.zde_dnode; } if (rc == 0) - dmu_prefetch(dev->od_os, *oid, 0, 0); + osd_dmu_prefetch(dev->od_os, *oid, 0, 0, 0, + ZIO_PRIORITY_ASYNC_READ); RETURN(rc); } @@ -507,6 +660,8 @@ osd_oi_remove_table(const struct lu_env *env, struct osd_device *o, int key) oi = o->od_oi_table[key]; if (oi) { + if (oi->oi_dn) + osd_dnode_rele(oi->oi_dn); OBD_FREE_PTR(oi); o->od_oi_table[key] = NULL; } @@ -536,6 +691,7 @@ osd_oi_add_table(const struct lu_env *env, struct osd_device *o, } o->od_oi_table[key] = oi; + __osd_obj2dnode(o->od_os, oi->oi_zapid, &oi->oi_dn); return 0; } @@ -577,13 +733,15 @@ osd_oi_open_table(const struct lu_env *env, struct osd_device *o, int count) /** * Determine if the type and number of OIs used by this file system. */ -static int -osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count) +static int osd_oi_probe(const struct lu_env *env, struct osd_device *o) { - uint64_t root_oid = o->od_root; - struct osd_oi oi; - char name[16]; - int rc; + struct lustre_scrub *scrub = &o->od_scrub; + struct scrub_file *sf = &scrub->os_file; + struct osd_oi oi; + char name[16]; + int max = sf->sf_oi_count > 0 ? sf->sf_oi_count : OSD_OI_FID_NR_MAX; + int count; + int rc; ENTRY; /* @@ -592,40 +750,26 @@ osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count) * The only safeguard is that we know the number of OIs must be a * power of two and this is checked for basic sanity. */ - for (*count = 0; *count < OSD_OI_FID_NR_MAX; (*count)++) { - sprintf(name, "%s.%d", DMU_OSD_OI_NAME_BASE, *count); - rc = osd_oi_lookup(env, o, root_oid, name, &oi); - if (rc == 0) + for (count = 0; count < max; count++) { + snprintf(name, sizeof(name) - 1, "%s.%d", + DMU_OSD_OI_NAME_BASE, count); + rc = osd_oi_lookup(env, o, o->od_root, name, &oi); + if (!rc) continue; if (rc == -ENOENT) { - if (*count == 0) - break; - - if ((*count & (*count - 1)) != 0) - RETURN(-EDOM); + if (sf->sf_oi_count == 0) + RETURN(count); - RETURN(0); + zfs_set_bit(count, sf->sf_oi_bitmap); + continue; } - RETURN(rc); + if (rc) + RETURN(rc); } - /* - * No OIs exist, this must be a new filesystem. - */ - *count = 0; - - RETURN(0); -} - -static void osd_ost_seq_init(const struct lu_env *env, struct osd_device *osd) -{ - struct osd_seq_list *osl = &osd->od_seq_list; - - INIT_LIST_HEAD(&osl->osl_seq_list); - rwlock_init(&osl->osl_seq_list_lock); - sema_init(&osl->osl_seq_init_sem, 1); + RETURN(count); } static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd) @@ -637,13 +781,11 @@ static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd) list_for_each_entry_safe(osd_seq, tmp, &osl->osl_seq_list, os_seq_list) { list_del(&osd_seq->os_seq_list); - OBD_FREE(osd_seq->os_compat_dirs, - sizeof(uint64_t) * osd_seq->os_subdir_count); + OBD_FREE_PTR_ARRAY(osd_seq->os_compat_dirs, + osd_seq->os_subdir_count); OBD_FREE(osd_seq, sizeof(*osd_seq)); } write_unlock(&osl->osl_seq_list_lock); - - return; } /** @@ -652,135 +794,51 @@ static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd) static int osd_oi_init_compat(const struct lu_env *env, struct osd_device *o) { - uint64_t odb, sdb; - int rc; + uint64_t sdb; + int rc; ENTRY; rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb); - if (rc) - RETURN(rc); - - o->od_O_id = sdb; - - osd_ost_seq_init(env, o); - /* Create on-disk indexes to maintain per-UID/GID inode usage. - * Those new indexes are created in the top-level ZAP outside the - * namespace in order not to confuse ZPL which might interpret those - * indexes as directories and assume the values are object IDs */ - rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ, - oid2name(ACCT_USER_OID), &odb); - if (rc) - RETURN(rc); - o->od_iusr_oid = odb; - - rc = osd_oi_find_or_create(env, o, MASTER_NODE_OBJ, - oid2name(ACCT_GROUP_OID), &odb); - if (rc) - RETURN(rc); - o->od_igrp_oid = odb; + if (!rc) + o->od_O_id = sdb; RETURN(rc); } -static char *root2convert = "ROOT"; -/* - * due to DNE requirements we have to change sequence of /ROOT object - * so that it doesn't belong to the local sequence FID_SEQ_LOCAL_FILE - * but a normal sequence living on MDS#0 - * this is the sole purpose of this function. - * - * This is only needed for pre-production 2.4 ZFS filesystems, and - * can be removed in the future. - */ -int osd_convert_root_to_new_seq(const struct lu_env *env, - struct osd_device *o) -{ - struct luz_direntry *lze = &osd_oti_get(env)->oti_zde; - char *buf = osd_oti_get(env)->oti_str; - struct lu_fid newfid; - uint64_t zapid; - dmu_tx_t *tx = NULL; - int rc; +static int +osd_oi_init_index_backup(const struct lu_env *env, struct osd_device *o) +{ + struct lu_fid *fid = &osd_oti_get(env)->oti_fid; + int rc; ENTRY; - /* ignore OSTs */ - if (strstr(o->od_svname, "MDT") == NULL) - RETURN(0); - - /* lookup /ROOT */ - rc = -zap_lookup(o->od_os, o->od_root, root2convert, 8, - sizeof(*lze) / 8, (void *)lze); - /* doesn't exist or let actual user to handle the error */ - if (rc) - RETURN(0); - - CDEBUG(D_OTHER, "%s: /ROOT -> "DFID" -> "LPU64"\n", o->od_svname, - PFID(&lze->lzd_fid), (long long int) lze->lzd_reg.zde_dnode); - - /* already right one? */ - if (fid_seq(&lze->lzd_fid) == FID_SEQ_ROOT) - return 0; - - tx = dmu_tx_create(o->od_os); - if (tx == NULL) - return -ENOMEM; - - dmu_tx_hold_bonus(tx, o->od_root); - - /* declare delete/insert of the name */ - dmu_tx_hold_zap(tx, o->od_root, TRUE, root2convert); - dmu_tx_hold_zap(tx, o->od_root, FALSE, root2convert); - - /* declare that we'll remove object from fid-dnode mapping */ - zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf); - dmu_tx_hold_bonus(tx, zapid); - dmu_tx_hold_zap(tx, zapid, FALSE, buf); - - /* declare that we'll add object to fid-dnode mapping */ - newfid.f_seq = FID_SEQ_ROOT; - newfid.f_oid = 1; - newfid.f_ver = 0; - zapid = osd_get_name_n_idx(env, o, &newfid, buf); - dmu_tx_hold_bonus(tx, zapid); - dmu_tx_hold_zap(tx, zapid, TRUE, buf); - - rc = -dmu_tx_assign(tx, TXG_WAIT); - if (rc) - GOTO(err, rc); - - rc = -zap_remove(o->od_os, o->od_root, root2convert, tx); - if (rc) - GOTO(err, rc); - - /* remove from OI */ - zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf); - rc = -zap_remove(o->od_os, zapid, buf, tx); - if (rc) - GOTO(err, rc); - - lze->lzd_fid = newfid; - rc = -zap_add(o->od_os, o->od_root, root2convert, - 8, sizeof(*lze) / 8, (void *)lze, tx); - if (rc) - GOTO(err, rc); - - /* add to OI with the new fid */ - zapid = osd_get_name_n_idx(env, o, &newfid, buf); - rc = -zap_add(o->od_os, zapid, buf, 8, 1, &lze->lzd_reg, tx); - if (rc) - GOTO(err, rc); - - - /* LMA will be updated in mdd_compat_fixes */ - dmu_tx_commit(tx); + lu_local_obj_fid(fid, INDEX_BACKUP_OID); + rc = osd_obj_find_or_create(env, o, o->od_root, INDEX_BACKUP_DIR, + &o->od_index_backup_id, fid, true); RETURN(rc); +} -err: - if (tx) - dmu_tx_abort(tx); - CERROR("%s: can't convert to new fid: rc = %d\n", o->od_svname, rc); - RETURN(rc); +static void +osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o) +{ + uint64_t sdb; + int rc; + ENTRY; + + if (o->od_is_ost) { + o->od_remote_parent_dir = ZFS_NO_OBJECT; + } else { + /* Remote parent only used for cross-MDT objects, + * it is usless for single MDT case or under read + * only mode. So ignore the failure. */ + rc = osd_oi_find_or_create(env, o, o->od_root, + REMOTE_PARENT_DIR, &sdb); + if (!rc) + o->od_remote_parent_dir = sdb; + else + o->od_remote_parent_dir = ZFS_NO_OBJECT; + } } /** @@ -788,45 +846,100 @@ err: */ int osd_oi_init(const struct lu_env *env, struct osd_device *o) { - char *key = osd_oti_get(env)->oti_buf; - int i, rc, count = 0; + struct lustre_scrub *scrub = &o->od_scrub; + struct scrub_file *sf = &scrub->os_file; + char *key = osd_oti_get(env)->oti_buf; + uint64_t sdb; + int i, rc, count; ENTRY; - rc = osd_oi_probe(env, o, &count); + LASSERTF((sf->sf_oi_count & (sf->sf_oi_count - 1)) == 0, + "Invalid OI count in scrub file %d\n", sf->sf_oi_count); + + rc = osd_oi_init_index_backup(env, o); if (rc) RETURN(rc); - if (count == 0) { - uint64_t odb, sdb; + osd_oi_init_remote_parent(env, o); - count = osd_oi_count; - odb = o->od_root; + rc = osd_oi_init_compat(env, o); + if (rc) + RETURN(rc); + + count = osd_oi_probe(env, o); + if (count < 0) + GOTO(out, rc = count); + + if (count > 0) { + if (count == sf->sf_oi_count) + goto open; + + if (sf->sf_oi_count == 0) { + if (likely((count & (count - 1)) == 0)) { + sf->sf_oi_count = count; + rc = scrub_file_store(env, scrub); + if (rc) + GOTO(out, rc); + + goto open; + } - for (i = 0; i < count; i++) { - sprintf(key, "%s.%d", DMU_OSD_OI_NAME_BASE, i); - rc = osd_oi_find_or_create(env, o, odb, key, &sdb); - if (rc) - RETURN(rc); + LCONSOLE_ERROR("%s: invalid oi count %d. You can " + "remove all OIs, then remount it\n", + osd_name(o), count); + GOTO(out, rc = -EDOM); + } + + scrub_file_reset(scrub, o->od_uuid, SF_RECREATED); + count = sf->sf_oi_count; + } else { + if (sf->sf_oi_count > 0) { + count = sf->sf_oi_count; + memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE); + for (i = 0; i < count; i++) + zfs_set_bit(i, sf->sf_oi_bitmap); + scrub_file_reset(scrub, o->od_uuid, SF_RECREATED); + } else { + count = sf->sf_oi_count = osd_oi_count; } } - rc = osd_oi_init_compat(env, o); + rc = scrub_file_store(env, scrub); if (rc) - RETURN(rc); + GOTO(out, rc); + + for (i = 0; i < count; i++) { + LASSERT(sizeof(osd_oti_get(env)->oti_buf) >= 32); + snprintf(key, sizeof(osd_oti_get(env)->oti_buf) - 1, + "%s.%d", DMU_OSD_OI_NAME_BASE, i); + rc = osd_oi_find_or_create(env, o, o->od_root, key, &sdb); + if (rc) + GOTO(out, rc); + } + +open: LASSERT((count & (count - 1)) == 0); o->od_oi_count = count; - OBD_ALLOC(o->od_oi_table, sizeof(struct osd_oi *) * count); + OBD_ALLOC_PTR_ARRAY(o->od_oi_table, count); if (o->od_oi_table == NULL) - RETURN(-ENOMEM); + GOTO(out, rc = -ENOMEM); rc = osd_oi_open_table(env, o, count); + + GOTO(out, rc); + +out: if (rc) { - OBD_FREE(o->od_oi_table, sizeof(struct osd_oi *) * count); - o->od_oi_table = NULL; + osd_ost_seq_fini(env, o); + + if (o->od_oi_table) { + OBD_FREE_PTR_ARRAY(o->od_oi_table, count); + o->od_oi_table = NULL; + } } - RETURN(rc); + return rc; } void osd_oi_fini(const struct lu_env *env, struct osd_device *o) @@ -837,8 +950,7 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o) if (o->od_oi_table != NULL) { (void) osd_oi_close_table(env, o); - OBD_FREE(o->od_oi_table, - sizeof(struct osd_oi *) * o->od_oi_count); + OBD_FREE_PTR_ARRAY(o->od_oi_table, o->od_oi_count); o->od_oi_table = NULL; o->od_oi_count = 0; } @@ -861,4 +973,173 @@ int osd_options_init(void) return 0; } +/* + * the following set of functions are used to maintain per-thread + * cache of FID->ino mapping. this mechanism is used to avoid + * expensive LU/OI lookups. + */ +struct osd_idmap_cache *osd_idc_find(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_idmap_cache *idc = oti->oti_ins_cache; + int i; + + for (i = 0; i < oti->oti_ins_cache_used; i++) { + if (!lu_fid_eq(&idc[i].oic_fid, fid)) + continue; + if (idc[i].oic_dev != osd) + continue; + + return idc + i; + } + + return NULL; +} + +struct osd_idmap_cache *osd_idc_add(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_idmap_cache *idc; + int i; + + if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) { + i = oti->oti_ins_cache_size * 2; + if (i == 0) + i = OSD_INS_CACHE_SIZE; + OBD_ALLOC_PTR_ARRAY_LARGE(idc, i); + if (idc == NULL) + return ERR_PTR(-ENOMEM); + if (oti->oti_ins_cache != NULL) { + memcpy(idc, oti->oti_ins_cache, + oti->oti_ins_cache_used * sizeof(*idc)); + OBD_FREE_PTR_ARRAY_LARGE(oti->oti_ins_cache, + oti->oti_ins_cache_used); + } + oti->oti_ins_cache = idc; + oti->oti_ins_cache_size = i; + } + + idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++]; + idc->oic_fid = *fid; + idc->oic_dev = osd; + idc->oic_dnode = 0; + idc->oic_remote = 0; + + return idc; +} + +/** + * Lookup mapping for the given fid in the cache + * + * Initialize a new one if not found. the initialization checks whether + * the object is local or remote. for the local objects, OI is used to + * learn dnode#. the function is used when the caller has no information + * about the object, e.g. at dt_insert(). + */ +struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_idmap_cache *idc; + int rc; + + LASSERT(!fid_is_acct(fid)); + + idc = osd_idc_find(env, osd, fid); + if (idc != NULL) + return idc; + + CDEBUG(D_INODE, "%s: FID "DFID" not in the id map cache\n", + osd->od_svname, PFID(fid)); + + /* new mapping is needed */ + idc = osd_idc_add(env, osd, fid); + if (IS_ERR(idc)) { + CERROR("%s: FID "DFID" add id map cache failed: %ld\n", + osd->od_svname, PFID(fid), PTR_ERR(idc)); + return idc; + } + + /* initialize it */ + rc = osd_remote_fid(env, osd, fid); + if (unlikely(rc < 0)) + return ERR_PTR(rc); + + if (rc == 0) { + /* the object is local, lookup in OI */ + uint64_t dnode; + + rc = osd_fid_lookup(env, osd, fid, &dnode); + if (unlikely(rc < 0)) { + CERROR("%s: can't lookup: rc = %d\n", + osd->od_svname, rc); + return ERR_PTR(rc); + } + LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT)); + idc->oic_dnode = dnode; + } else { + /* the object is remote */ + idc->oic_remote = 1; + } + + return idc; +} + +/* + * lookup mapping for given FID and fill it from the given object. + * the object is local by definition. + */ +int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj) +{ + const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); + struct osd_idmap_cache *idc; + + idc = osd_idc_find(env, osd, fid); + if (idc != NULL) { + if (obj->oo_dn == NULL) + return 0; + idc->oic_dnode = obj->oo_dn->dn_object; + return 0; + } + + CDEBUG(D_INODE, "%s: FID "DFID" not in the id map cache\n", + osd->od_svname, PFID(fid)); + + /* new mapping is needed */ + idc = osd_idc_add(env, osd, fid); + if (IS_ERR(idc)) { + CERROR("%s: FID "DFID" add id map cache failed: %ld\n", + osd->od_svname, PFID(fid), PTR_ERR(idc)); + return PTR_ERR(idc); + } + + if (obj->oo_dn) + idc->oic_dnode = obj->oo_dn->dn_object; + + return 0; +} + +int osd_idc_find_and_init_with_oid(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid, + uint64_t oid) +{ + struct osd_idmap_cache *idc; + + idc = osd_idc_find(env, osd, fid); + if (!idc) { + idc = osd_idc_add(env, osd, fid); + if (IS_ERR(idc)) + return PTR_ERR(idc); + } + + idc->oic_dnode = oid; + idc->oic_remote = 0; + return 0; +}