X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosd-zfs%2Fosd_oi.c;h=00ce2ea2215718a7b387ee5a4a160c9f69ce82a8;hp=0df1568e611bb0f881fcd8ddbd9bf0216cea67fe;hb=85e91e77667b796a2ef73e1610b16383b9b188c3;hpb=e3c85d27cb4b3fc59181ed52b5090e812a1ab0ae diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 0df1568..00ce2ea 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -28,7 +28,7 @@ * Use is subject to license terms. */ /* - * Copyright (c) 2011, 2012 Whamcloud, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. * Use is subject to license terms. */ /* @@ -40,11 +40,9 @@ * * Author: Alex Zhuravlev * Author: Mike Pershin + * Author: Di Wang */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif #define DEBUG_SUBSYSTEM S_OSD #include @@ -89,22 +87,17 @@ struct named_oid { }; static const struct named_oid oids[] = { - { OFD_LAST_RECV_OID, LAST_RCVD }, + { LAST_RECV_OID, LAST_RCVD }, { OFD_LAST_GROUP_OID, "LAST_GROUP" }, { LLOG_CATALOGS_OID, "CATALOGS" }, { MGS_CONFIGS_OID, NULL /*MOUNT_CONFIGS_DIR*/ }, - { FID_SEQ_SRV_OID, NULL /* "seq_srv" */}, - { FID_SEQ_CTL_OID, NULL /*"seq_ctl"*/ }, - { MDD_CAPA_KEYS_OID, NULL /*CAPA_KEYS*/ }, - { FLD_INDEX_OID, NULL /* "fld" */ }, + { FID_SEQ_SRV_OID, "seq_srv" }, + { FID_SEQ_CTL_OID, "seq_ctl" }, + { FLD_INDEX_OID, "fld" }, { MDD_LOV_OBJ_OID, LOV_OBJID }, - { MDT_LAST_RECV_OID, LAST_RCVD }, { OFD_HEALTH_CHECK_OID, HEALTH_CHECK }, - { OFD_GROUP0_LAST_OID, "LAST_ID" }, { ACCT_USER_OID, "acct_usr_inode" }, { ACCT_GROUP_OID, "acct_grp_inode" }, - { MDD_ROOT_INDEX_OID, NULL }, - { MDD_ORPHAN_OID, NULL }, { 0, NULL } }; @@ -120,6 +113,253 @@ static char *oid2name(const unsigned long oid) return NULL; } +/** + * Lookup an existing OI by the given name. + */ +static int +osd_oi_lookup(const struct lu_env *env, struct osd_device *o, + uint64_t parent, const char *name, struct osd_oi *oi) +{ + struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; + int rc; + + rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde); + if (rc) + return rc; + + strncpy(oi->oi_name, name, OSD_OI_NAME_SIZE - 1); + oi->oi_zapid = zde->zde_dnode; + + return rc; +} + +/** + * Create a new OI with the given name. + */ +static int +osd_oi_create(const struct lu_env *env, struct osd_device *o, + uint64_t parent, const char *name, uint64_t *child) +{ + struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; + struct lu_attr *la = &osd_oti_get(env)->oti_la; + dmu_buf_t *db; + dmu_tx_t *tx; + int rc; + + /* verify it doesn't already exist */ + rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde); + if (rc == 0) + return -EEXIST; + + /* create fid-to-dnode index */ + tx = dmu_tx_create(o->od_objset.os); + if (tx == NULL) + return -ENOMEM; + + dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL); + dmu_tx_hold_bonus(tx, parent); + dmu_tx_hold_zap(tx, parent, TRUE, name); + LASSERT(tx->tx_objset->os_sa); + dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE); + + rc = -dmu_tx_assign(tx, TXG_WAIT); + if (rc) { + dmu_tx_abort(tx); + return rc; + } + + la->la_valid = LA_MODE | LA_UID | LA_GID; + la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; + la->la_uid = la->la_gid = 0; + __osd_zap_create(env, &o->od_objset, &db, tx, la, parent, oi_tag, 0); + + zde->zde_dnode = db->db_object; + zde->zde_pad = 0; + zde->zde_type = IFTODT(S_IFDIR); + + rc = -zap_add(o->od_objset.os, parent, name, 8, 1, (void *)zde, tx); + + dmu_tx_commit(tx); + + *child = db->db_object; + sa_buf_rele(db, oi_tag); + + return rc; +} + +static int +osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o, + uint64_t parent, const char *name, uint64_t *child) +{ + struct osd_oi oi; + int rc; + + rc = osd_oi_lookup(env, o, parent, name, &oi); + if (rc == 0) + *child = oi.oi_zapid; + else if (rc == -ENOENT) + rc = osd_oi_create(env, o, parent, name, child); + + return rc; +} + +/** + * Lookup the target index/flags of the fid, so it will know where + * the object is located (tgt index) and it is MDT or OST object. + */ +int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd, + obd_seq seq, struct lu_seq_range *range) +{ + struct seq_server_site *ss = osd_seq_site(osd); + + if (fid_seq_is_idif(seq)) { + fld_range_set_ost(range); + range->lsr_index = idif_ost_idx(seq); + return 0; + } + + if (!fid_seq_in_fldb(seq)) { + fld_range_set_mdt(range); + if (ss != NULL) + /* FIXME: If ss is NULL, it suppose not get lsr_index + * at all */ + range->lsr_index = ss->ss_node_id; + return 0; + } + + LASSERT(ss != NULL); + fld_range_set_any(range); + /* OSD will only do local fld lookup */ + return fld_local_lookup(env, ss->ss_server_fld, seq, range); +} + +int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid) +{ + struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range; + int rc; + ENTRY; + + if (fid_is_idif(fid)) + RETURN(1); + + if (unlikely(fid_is_local_file(fid) || fid_is_llog(fid)) || + fid_is_name_llog(fid) || fid_is_quota(fid)) + RETURN(0); + + rc = osd_fld_lookup(env, osd, fid_seq(fid), range); + if (rc != 0) { + if (rc != -ENOENT) + CERROR("%s: "DFID" lookup failed: rc = %d\n", + osd_name(osd), PFID(fid), rc); + RETURN(0); + } + + if (fld_range_is_ost(range)) + RETURN(1); + + RETURN(0); +} + +static struct osd_seq *osd_seq_find_locked(struct osd_seq_list *seq_list, + obd_seq seq) +{ + struct osd_seq *osd_seq; + + cfs_list_for_each_entry(osd_seq, &seq_list->osl_seq_list, os_seq_list) { + if (osd_seq->os_seq == seq) + return osd_seq; + } + return NULL; +} + +static struct osd_seq *osd_seq_find(struct osd_seq_list *seq_list, + obd_seq seq) +{ + struct osd_seq *osd_seq; + + read_lock(&seq_list->osl_seq_list_lock); + osd_seq = osd_seq_find_locked(seq_list, seq); + read_unlock(&seq_list->osl_seq_list_lock); + + return osd_seq; +} + +static struct osd_seq *osd_find_or_add_seq(const struct lu_env *env, + struct osd_device *osd, obd_seq seq) +{ + struct osd_seq_list *seq_list = &osd->od_seq_list; + struct osd_seq *osd_seq; + char *key = osd_oti_get(env)->oti_buf; + char *seq_name = osd_oti_get(env)->oti_str; + struct osd_oi oi; + uint64_t sdb, odb; + int i; + int rc = 0; + ENTRY; + + osd_seq = osd_seq_find(seq_list, seq); + if (osd_seq != NULL) + RETURN(osd_seq); + + down(&seq_list->osl_seq_init_sem); + /* Check again, in case some one else already add it + * to the list */ + osd_seq = osd_seq_find(seq_list, seq); + if (osd_seq != NULL) + GOTO(out, rc = 0); + + OBD_ALLOC_PTR(osd_seq); + if (osd_seq == NULL) + GOTO(out, rc = -ENOMEM); + + CFS_INIT_LIST_HEAD(&osd_seq->os_seq_list); + osd_seq->os_seq = seq; + + /* Init subdir count to be 32, but each seq can have + * different subdir count */ + osd_seq->os_subdir_count = OSD_OST_MAP_SIZE; + OBD_ALLOC(osd_seq->os_compat_dirs, + sizeof(uint64_t) * osd_seq->os_subdir_count); + if (osd_seq->os_compat_dirs == NULL) + GOTO(out, rc = -ENOMEM); + + oi.oi_zapid = osd->od_O_id; + sprintf(seq_name, (fid_seq_is_rsvd(seq) || + fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i, + fid_seq_is_idif(seq) ? 0 : seq); + + rc = osd_oi_find_or_create(env, osd, oi.oi_zapid, seq_name, &odb); + if (rc != 0) { + CERROR("%s: Can not create %s : rc = %d\n", + osd_name(osd), seq_name, rc); + GOTO(out, rc); + } + + for (i = 0; i < OSD_OST_MAP_SIZE; i++) { + sprintf(key, "d%d", i); + rc = osd_oi_find_or_create(env, osd, odb, key, &sdb); + if (rc) + GOTO(out, rc); + osd_seq->os_compat_dirs[i] = sdb; + } + + write_lock(&seq_list->osl_seq_list_lock); + cfs_list_add(&osd_seq->os_seq_list, &seq_list->osl_seq_list); + write_unlock(&seq_list->osl_seq_list_lock); +out: + up(&seq_list->osl_seq_init_sem); + if (rc != 0) { + if (osd_seq != NULL && osd_seq->os_compat_dirs != NULL) + OBD_FREE(osd_seq->os_compat_dirs, + sizeof(uint64_t) * osd_seq->os_subdir_count); + if (osd_seq != NULL) + OBD_FREE_PTR(osd_seq); + osd_seq = ERR_PTR(rc); + } + RETURN(osd_seq); +} + /* * objects w/o a natural reference (unlike a file on a MDS) * are put under a special hierarchy /O//d0..dXX @@ -129,17 +369,25 @@ static uint64_t osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, const struct lu_fid *fid, char *buf) { + struct osd_seq *osd_seq; unsigned long b; int rc; - rc = fid_ostid_pack(fid, &osd_oti_get(env)->oti_ostid); + osd_seq = osd_find_or_add_seq(env, osd, fid_seq(fid)); + if (IS_ERR(osd_seq)) { + CERROR("%s: Can not find seq group "DFID"\n", osd_name(osd), + PFID(fid)); + return PTR_ERR(osd_seq); + } + + rc = fid_to_ostid(fid, &osd_oti_get(env)->oti_ostid); LASSERT(rc == 0); /* we should not get here with IGIF */ - b = osd_oti_get(env)->oti_ostid.oi_id % OSD_OST_MAP_SIZE; - LASSERT(osd->od_ost_compat_dirs[b]); + b = ostid_id(&osd_oti_get(env)->oti_ostid) % OSD_OST_MAP_SIZE; + LASSERT(osd_seq->os_compat_dirs[b]); - sprintf(buf, LPU64, osd_oti_get(env)->oti_ostid.oi_id); + sprintf(buf, LPU64, ostid_id(&osd_oti_get(env)->oti_ostid)); - return osd->od_ost_compat_dirs[b]; + return osd_seq->os_compat_dirs[b]; } /* XXX: f_ver is not counted, but may differ too */ @@ -175,7 +423,7 @@ uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, LASSERT(fid); LASSERT(buf); - if (fid_is_idif(fid)) { + if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) { zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf); } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) { /* special objects with fixed known fids get their name */ @@ -184,9 +432,7 @@ uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, if (name) { zapid = osd->od_root; strcpy(buf, name); - if (fid_oid(fid) == OFD_GROUP0_LAST_OID) - zapid = osd->od_ost_compat_grp0; - else if (fid_is_acct(fid)) + if (fid_is_acct(fid)) zapid = MASTER_NODE_OBJ; } else { zapid = osd_get_idx_for_fid(osd, fid, buf); @@ -238,97 +484,6 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev, } /** - * Lookup an existing OI by the given name. - */ -static int -osd_oi_lookup(const struct lu_env *env, struct osd_device *o, - uint64_t parent, const char *name, struct osd_oi *oi) -{ - struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; - int rc; - - rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde); - if (rc) - return rc; - - strncpy(oi->oi_name, name, OSD_OI_NAME_SIZE - 1); - oi->oi_zapid = zde->zde_dnode; - - return rc; -} - -/** - * Create a new OI with the given name. - */ -static int -osd_oi_create(const struct lu_env *env, struct osd_device *o, - uint64_t parent, const char *name, uint64_t *child) -{ - struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; - struct lu_attr *la = &osd_oti_get(env)->oti_la; - dmu_buf_t *db; - dmu_tx_t *tx; - int rc; - - /* verify it doesn't already exist */ - rc = -zap_lookup(o->od_objset.os, parent, name, 8, 1, (void *)zde); - if (rc == 0) - return -EEXIST; - - /* create fid-to-dnode index */ - tx = dmu_tx_create(o->od_objset.os); - if (tx == NULL) - return -ENOMEM; - - dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, 1, NULL); - dmu_tx_hold_bonus(tx, parent); - dmu_tx_hold_zap(tx, parent, TRUE, name); - LASSERT(tx->tx_objset->os_sa); - dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE); - - rc = -dmu_tx_assign(tx, TXG_WAIT); - if (rc) { - dmu_tx_abort(tx); - return rc; - } - - la->la_valid = LA_MODE | LA_UID | LA_GID; - la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; - la->la_uid = la->la_gid = 0; - __osd_zap_create(env, &o->od_objset, &db, tx, la, oi_tag, 0); - - zde->zde_dnode = db->db_object; - zde->zde_pad = 0; - zde->zde_type = IFTODT(S_IFDIR); - - rc = -zap_add(o->od_objset.os, parent, name, 8, 1, (void *)zde, tx); - - dmu_tx_commit(tx); - - *child = db->db_object; - sa_buf_rele(db, oi_tag); - - return rc; -} - -static int -osd_oi_find_or_create(const struct lu_env *env, struct osd_device *o, - uint64_t parent, const char *name, uint64_t *child) -{ - struct osd_oi oi; - int rc; - - rc = osd_oi_lookup(env, o, parent, name, &oi); - if (rc == 0) { - *child = oi.oi_zapid; - } else if (rc == -ENOENT) { - rc = osd_oi_create(env, o, parent, name, child); - } - - return rc; -} - -/** * Close an entry in a specific slot. */ static void @@ -452,37 +607,50 @@ osd_oi_probe(const struct lu_env *env, struct osd_device *o, int *count) RETURN(0); } +static void osd_ost_seq_init(const struct lu_env *env, struct osd_device *osd) +{ + struct osd_seq_list *osl = &osd->od_seq_list; + + CFS_INIT_LIST_HEAD(&osl->osl_seq_list); + rwlock_init(&osl->osl_seq_list_lock); + sema_init(&osl->osl_seq_init_sem, 1); +} + +static void osd_ost_seq_fini(const struct lu_env *env, struct osd_device *osd) +{ + struct osd_seq_list *osl = &osd->od_seq_list; + struct osd_seq *osd_seq, *tmp; + + write_lock(&osl->osl_seq_list_lock); + cfs_list_for_each_entry_safe(osd_seq, tmp, &osl->osl_seq_list, + os_seq_list) { + cfs_list_del(&osd_seq->os_seq_list); + OBD_FREE(osd_seq->os_compat_dirs, + sizeof(uint64_t) * osd_seq->os_subdir_count); + OBD_FREE(osd_seq, sizeof(*osd_seq)); + } + write_unlock(&osl->osl_seq_list_lock); + + return; +} + /** * Create /O subdirectory to map legacy OST objects for compatibility. */ static int osd_oi_init_compat(const struct lu_env *env, struct osd_device *o) { - char *key = osd_oti_get(env)->oti_buf; uint64_t odb, sdb; - int i, rc; + int rc; ENTRY; rc = osd_oi_find_or_create(env, o, o->od_root, "O", &sdb); if (rc) RETURN(rc); - /* create /O/0 subdirectory to map legacy OST objects */ - rc = osd_oi_find_or_create(env, o, sdb, "0", &odb); - if (rc) - RETURN(rc); - - o->od_ost_compat_grp0 = odb; - - for (i = 0; i < OSD_OST_MAP_SIZE; i++) { - sprintf(key, "d%d", i); - rc = osd_oi_find_or_create(env, o, odb, key, &sdb); - if (rc) - RETURN(rc); - - o->od_ost_compat_dirs[i] = sdb; - } + o->od_O_id = sdb; + osd_ost_seq_init(env, o); /* Create on-disk indexes to maintain per-UID/GID inode usage. * Those new indexes are created in the top-level ZAP outside the * namespace in order not to confuse ZPL which might interpret those @@ -502,6 +670,107 @@ osd_oi_init_compat(const struct lu_env *env, struct osd_device *o) RETURN(rc); } +static char *root2convert = "ROOT"; +/* + * due to DNE requirements we have to change sequence of /ROOT object + * so that it doesn't belong to the local sequence FID_SEQ_LOCAL_FILE + * but a normal sequence living on MDS#0 + * this is the sole purpose of this function. + * + * This is only needed for pre-production 2.4 ZFS filesystems, and + * can be removed in the future. + */ +int osd_convert_root_to_new_seq(const struct lu_env *env, + struct osd_device *o) +{ + struct luz_direntry *lze = &osd_oti_get(env)->oti_zde; + char *buf = osd_oti_get(env)->oti_str; + struct lu_fid newfid; + uint64_t zapid; + dmu_tx_t *tx = NULL; + int rc; + ENTRY; + + /* ignore OSTs */ + if (strstr(o->od_svname, "MDT") == NULL) + RETURN(0); + + /* lookup /ROOT */ + rc = -zap_lookup(o->od_objset.os, o->od_root, root2convert, 8, + sizeof(*lze) / 8, (void *)lze); + /* doesn't exist or let actual user to handle the error */ + if (rc) + RETURN(0); + + CDEBUG(D_OTHER, "%s: /ROOT -> "DFID" -> "LPU64"\n", o->od_svname, + PFID(&lze->lzd_fid), (long long int) lze->lzd_reg.zde_dnode); + + /* already right one? */ + if (fid_seq(&lze->lzd_fid) == FID_SEQ_ROOT) + return 0; + + tx = dmu_tx_create(o->od_objset.os); + if (tx == NULL) + return -ENOMEM; + + dmu_tx_hold_bonus(tx, o->od_root); + + /* declare delete/insert of the name */ + dmu_tx_hold_zap(tx, o->od_root, TRUE, root2convert); + dmu_tx_hold_zap(tx, o->od_root, FALSE, root2convert); + + /* declare that we'll remove object from fid-dnode mapping */ + zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf); + dmu_tx_hold_bonus(tx, zapid); + dmu_tx_hold_zap(tx, zapid, FALSE, buf); + + /* declare that we'll add object to fid-dnode mapping */ + newfid.f_seq = FID_SEQ_ROOT; + newfid.f_oid = 1; + newfid.f_ver = 0; + zapid = osd_get_name_n_idx(env, o, &newfid, buf); + dmu_tx_hold_bonus(tx, zapid); + dmu_tx_hold_zap(tx, zapid, TRUE, buf); + + rc = -dmu_tx_assign(tx, TXG_WAIT); + if (rc) + GOTO(err, rc); + + rc = -zap_remove(o->od_objset.os, o->od_root, root2convert, tx); + if (rc) + GOTO(err, rc); + + /* remove from OI */ + zapid = osd_get_name_n_idx(env, o, &lze->lzd_fid, buf); + rc = -zap_remove(o->od_objset.os, zapid, buf, tx); + if (rc) + GOTO(err, rc); + + lze->lzd_fid = newfid; + rc = -zap_add(o->od_objset.os, o->od_root, root2convert, + 8, sizeof(*lze) / 8, (void *)lze, tx); + if (rc) + GOTO(err, rc); + + /* add to OI with the new fid */ + zapid = osd_get_name_n_idx(env, o, &newfid, buf); + rc = -zap_add(o->od_objset.os, zapid, buf, 8, 1, &lze->lzd_reg, tx); + if (rc) + GOTO(err, rc); + + + /* LMA will be updated in mdd_compat_fixes */ + dmu_tx_commit(tx); + + RETURN(rc); + +err: + if (tx) + dmu_tx_abort(tx); + CERROR("%s: can't convert to new fid: rc = %d\n", o->od_svname, rc); + RETURN(rc); +} + /** * Initialize the OIs by either opening or creating them as needed. */ @@ -552,6 +821,8 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o) { ENTRY; + osd_ost_seq_fini(env, o); + if (o->od_oi_table != NULL) { (void) osd_oi_close_table(env, o); OBD_FREE(o->od_oi_table,