From d16c76ea58f70cdac6c0de0e4fdbe5e329951c33 Mon Sep 17 00:00:00 2001 From: Oleg Drokin Date: Fri, 2 Sep 2016 16:38:18 +0000 Subject: [PATCH] Revert "LU-7898 osd: remove unnecessary declarations" This patch causes build failures in master due to reverted LU-7899 6cd79ab5860c5 patch that I failed to catch in time due to deficiency in my build process. This cannot be easily fixed since apparently a big chunk of functionality was yanked from under this patch, so I can only revert it for now. This reverts commit ead6df2feee9c143b617cb60e50e403c955bd401. Change-Id: I5ee89bf0c9260312f157c251b83dd417fa2cf260 Reviewed-on: http://review.whamcloud.com/22293 Reviewed-by: Oleg Drokin Tested-by: Oleg Drokin --- lustre/osd-zfs/osd_index.c | 30 +++-- lustre/osd-zfs/osd_internal.h | 27 ++--- lustre/osd-zfs/osd_io.c | 13 ++- lustre/osd-zfs/osd_object.c | 254 +++++++++++++++++++++--------------------- lustre/osd-zfs/osd_oi.c | 46 +++----- lustre/osd-zfs/osd_xattr.c | 82 ++++++-------- 6 files changed, 207 insertions(+), 245 deletions(-) diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 5f86160..8c1ffbf 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -165,6 +165,7 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_db); + LASSERT(osd_object_is_zap(obj->oo_db)); LASSERT(info); OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS); @@ -424,6 +425,8 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; + LASSERT(osd_object_is_zap(obj->oo_db)); + if (name[0] == '.') { if (name[1] == 0) { const struct lu_fid *f = lu_object_fid(&dt->do_lu); @@ -473,10 +476,8 @@ static int osd_declare_dir_insert(const struct lu_env *env, else object = obj->oo_db->db_object; - /* do not specify the key as then DMU is trying to look it up - * which is very expensive. usually the layers above lookup - * before insertion */ - dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL); + dmu_tx_hold_bonus(oh->ot_tx, object); + dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key); RETURN(0); } @@ -623,6 +624,7 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(parent->oo_db); + LASSERT(osd_object_is_zap(parent->oo_db)); LASSERT(dt_object_exists(dt)); LASSERT(osd_invariant(parent)); @@ -724,15 +726,12 @@ static int osd_declare_dir_delete(const struct lu_env *env, if (dt_object_exists(dt)) { LASSERT(obj->oo_db); + LASSERT(osd_object_is_zap(obj->oo_db)); dnode = obj->oo_db->db_object; } else { dnode = DMU_NEW_OBJECT; } - - /* do not specify the key as then DMU is trying to look it up - * which is very expensive. usually the layers above lookup - * before deletion */ - dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL); + dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key); RETURN(0); } @@ -749,6 +748,7 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(zap_db); + LASSERT(osd_object_is_zap(zap_db)); LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); @@ -1212,9 +1212,9 @@ static int osd_declare_index_insert(const struct lu_env *env, dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object); - /* do not specify the key as then DMU is trying to look it up - * which is very expensive. usually the layers above lookup - * before insertion */ + /* It is not clear what API should be used for binary keys, so we pass + * a null name which has the side effect of over-reserving space, + * accounting for the worst case. See zap_count_write() */ dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL); RETURN(0); @@ -1262,11 +1262,7 @@ static int osd_declare_index_delete(const struct lu_env *env, LASSERT(obj->oo_db); oh = container_of0(th, struct osd_thandle, ot_super); - - /* do not specify the key as then DMU is trying to look it up - * which is very expensive. usually the layers above lookup - * before deletion */ - dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL); + dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL); RETURN(0); } diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 6fb7a6e..0706ea1 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -347,17 +347,10 @@ struct osd_object { /* the i_flags in LMA */ __u32 oo_lma_flags; - union { - int oo_ea_in_bonus; /* EA bytes we expect */ - struct { - /* record size for index file */ - unsigned char oo_keysize; - unsigned char oo_recsize; - unsigned char oo_recusize; /* unit size */ - }; - }; - - + /* record size for index file */ + unsigned char oo_keysize; + unsigned char oo_recsize; + unsigned char oo_recusize; /* unit size */ }; int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *); @@ -472,12 +465,10 @@ int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type, void *buf, uint32_t buflen, struct osd_thandle *oh); int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, dmu_buf_t **zap_dbp, dmu_tx_t *tx, struct lu_attr *la, - zap_flags_t flags); + uint64_t parent, zap_flags_t flags); int __osd_object_create(const struct lu_env *env, struct osd_object *obj, - dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la); -int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, - sa_handle_t *sa_hdl, dmu_tx_t *tx, - struct lu_attr *la, uint64_t parent); + dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la, + uint64_t parent); /* osd_oi.c */ int osd_oi_init(const struct lu_env *env, struct osd_device *o); @@ -485,7 +476,7 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o); int osd_fid_lookup(const struct lu_env *env, struct osd_device *, const struct lu_fid *, uint64_t *); uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf, int bufsize); + const struct lu_fid *fid, char *buf); int osd_options_init(void); int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd, __u64 seq); @@ -534,8 +525,6 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, int __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int fl, struct osd_thandle *oh); -int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, - struct osd_thandle *oh); static inline int osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int fl, diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index d94bc97..eed35f5 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -168,11 +168,18 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, * LOHA_EXISTs is supposed to be the last step in the * initialization */ - /* size change (in dnode) will be declared by dmu_tx_hold_write() */ - if (dt_object_exists(dt)) + /* declare possible size change. notice we can't check + * current size here as another thread can change it */ + + if (dt_object_exists(dt)) { + LASSERT(obj->oo_db); oid = obj->oo_db->db_object; - else + + dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); + } else { oid = DMU_NEW_OBJECT; + dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); + } /* XXX: we still miss for append declaration support in ZFS * -1 means append which is used by llog mostly, llog diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index ef912ad..bd1fac2 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -504,6 +504,7 @@ static int osd_declare_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { + char *buf = osd_oti_get(env)->oti_str; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -519,14 +520,17 @@ static int osd_declare_object_destroy(const struct lu_env *env, LASSERT(oh->ot_tx != NULL); /* declare that we'll remove object from fid-dnode mapping */ - zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0); - dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, NULL); + zapid = osd_get_name_n_idx(env, osd, fid, buf); + dmu_tx_hold_bonus(oh->ot_tx, zapid); + dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, buf); osd_declare_xattrs_destroy(env, obj, oh); /* declare that we'll remove object from inode accounting ZAPs */ - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, buf); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, buf); /* one less inode */ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, @@ -553,8 +557,7 @@ static int osd_declare_object_destroy(const struct lu_env *env, static int osd_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct osd_thread_info *info = osd_oti_get(env); - char *buf = info->oti_str; + char *buf = osd_oti_get(env)->oti_str; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); const struct lu_fid *fid = lu_object_fid(&dt->do_lu); @@ -575,7 +578,7 @@ static int osd_object_destroy(const struct lu_env *env, LASSERT(oh->ot_tx != NULL); /* remove obj ref from index dir (it depends) */ - zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str)); + zapid = osd_get_name_n_idx(env, osd, fid, buf); rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx); if (rc) { CERROR("%s: zap_remove(%s) failed: rc = %d\n", @@ -843,14 +846,13 @@ static int osd_declare_attr_set(const struct lu_env *env, struct thandle *handle) { struct osd_thread_info *info = osd_oti_get(env); + char *buf = osd_oti_get(env)->oti_str; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); - dmu_tx_hold_t *txh; struct osd_thandle *oh; uint64_t bspace; uint32_t blksize; int rc = 0; - bool found; ENTRY; @@ -865,39 +867,20 @@ static int osd_declare_attr_set(const struct lu_env *env, LASSERT(obj->oo_sa_hdl != NULL); LASSERT(oh->ot_tx != NULL); - /* regular attributes are part of the bonus buffer */ - /* let's check whether this object is already part of - * transaction.. */ - found = false; - for (txh = list_head(&oh->ot_tx->tx_holds); txh; - txh = list_next(&oh->ot_tx->tx_holds, txh)) { - if (txh->txh_dnode == NULL) - continue; - if (txh->txh_dnode->dn_object != obj->oo_db->db_object) - continue; - /* this object is part of the transaction already - * we don't need to declare bonus again */ - found = true; - break; - } - if (!found) - dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object); + dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); if (oh->ot_tx->tx_err != 0) GOTO(out, rc = -oh->ot_tx->tx_err); - if (attr && attr->la_valid & LA_FLAGS) { - /* LMA is usually a part of bonus, no need to declare - * anything else */ - } + sa_object_size(obj->oo_sa_hdl, &blksize, &bspace); + bspace = toqb(bspace * blksize); - if (attr && (attr->la_valid & (LA_UID | LA_GID))) { - sa_object_size(obj->oo_sa_hdl, &blksize, &bspace); - bspace = toqb(bspace * blksize); - } + __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs), + XATTR_NAME_LMA, oh); if (attr && attr->la_valid & LA_UID) { /* account for user inode tracking ZAP update */ - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf); /* quota enforcement for user */ if (attr->la_uid != obj->oo_attr.la_uid) { @@ -911,7 +894,8 @@ static int osd_declare_attr_set(const struct lu_env *env, } if (attr && attr->la_valid & LA_GID) { /* account for user inode tracking ZAP update */ - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf); /* quota enforcement for group */ if (attr->la_gid != obj->oo_attr.la_gid) { @@ -1137,12 +1121,13 @@ static int osd_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *handle) { + char *buf = osd_oti_get(env)->oti_str; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t zapid; - int rc, dnode_size; + int rc; ENTRY; LASSERT(dof); @@ -1162,26 +1147,18 @@ static int osd_declare_object_create(const struct lu_env *env, oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_tx != NULL); - /* this is the minimum set of EAs on every Lustre object */ - obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE + - sizeof(__u64) + /* VBR VERSION */ - sizeof(struct lustre_mdt_attrs); /* LMA */ - /* reserve 32 bytes for extra stuff like ACLs */ - dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32); - switch (dof->dof_type) { case DFT_DIR: dt->do_index_ops = &osd_dir_ops; case DFT_INDEX: /* for zap create */ - dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL); - dmu_tx_hold_sa_create(oh->ot_tx, dnode_size); + dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL); break; case DFT_REGULAR: case DFT_SYM: case DFT_NODE: /* first, we'll create new object */ - dmu_tx_hold_sa_create(oh->ot_tx, dnode_size); + dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT); break; default: @@ -1190,12 +1167,20 @@ static int osd_declare_object_create(const struct lu_env *env, } /* and we'll add it to some mapping */ - zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0); - dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, NULL); + zapid = osd_get_name_n_idx(env, osd, fid, buf); + dmu_tx_hold_bonus(oh->ot_tx, zapid); + dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf); /* we will also update inode accounting ZAPs */ - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf); + dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf); + + dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); + + __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs), + XATTR_NAME_LMA, oh); rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh, false, NULL, false); @@ -1203,9 +1188,10 @@ static int osd_declare_object_create(const struct lu_env *env, } int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, - sa_handle_t *sa_hdl, dmu_tx_t *tx, - struct lu_attr *la, uint64_t parent) + uint64_t oid, dmu_tx_t *tx, struct lu_attr *la, + uint64_t parent) { + sa_handle_t *sa_hdl; sa_bulk_attr_t *bulk = osd_oti_get(env)->oti_attr_bulk; struct osa_attr *osa = &osd_oti_get(env)->oti_osa; uint64_t gen; @@ -1214,10 +1200,9 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, int cnt; int rc; - LASSERT(sa_hdl); - - gen = dmu_tx_get_txg(tx); gethrestime(&now); + gen = dmu_tx_get_txg(tx); + ZFS_TIME_ENCODE(&now, crtime); osa->atime[0] = la->la_atime; @@ -1231,6 +1216,11 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, osa->flags = attrs_fs2zfs(la->la_flags); osa->size = la->la_size; + /* Now add in all of the "SA" attributes */ + rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc) + return rc; + /* * we need to create all SA below upon object create. * @@ -1259,6 +1249,7 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx); + sa_handle_destroy(sa_hdl); return rc; } @@ -1268,7 +1259,8 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, * to a transaction group. */ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, - dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la) + dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la, + uint64_t parent) { uint64_t oid; int rc; @@ -1276,6 +1268,10 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS; + /* Assert that the transaction has been assigned to a + transaction group. */ + LASSERT(tx->tx_txg != 0); + /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks * would get an additional ditto copy */ if (unlikely(S_ISREG(la->la_mode) && @@ -1291,6 +1287,14 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, la->la_size = 0; la->la_nlink = 1; + rc = __osd_attr_init(env, osd, oid, tx, la, parent); + if (rc != 0) { + sa_buf_rele(*dbp, osd_obj_tag); + *dbp = NULL; + dmu_object_free(osd->od_os, oid, tx); + return rc; + } + return 0; } @@ -1306,7 +1310,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, * a conversion from the different internal ZAP hash formats being used. */ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, dmu_buf_t **zap_dbp, dmu_tx_t *tx, - struct lu_attr *la, zap_flags_t flags) + struct lu_attr *la, uint64_t parent, zap_flags_t flags) { uint64_t oid; int rc; @@ -1325,14 +1329,16 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, if (rc) return rc; + LASSERT(la->la_valid & LA_MODE); la->la_size = 2; la->la_nlink = 1; - return 0; + return __osd_attr_init(env, osd, oid, tx, la, parent); } static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, struct osd_thandle *oh) + struct lu_attr *la, uint64_t parent, + struct osd_thandle *oh) { dmu_buf_t *db; int rc; @@ -1342,7 +1348,7 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use * binary keys */ LASSERT(S_ISREG(la->la_mode)); - rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, + rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, parent, ZAP_FLAG_UINT64_KEY); if (rc) return ERR_PTR(rc); @@ -1350,20 +1356,23 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, } static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, struct osd_thandle *oh) + struct lu_attr *la, uint64_t parent, + struct osd_thandle *oh) { dmu_buf_t *db; int rc; LASSERT(S_ISDIR(la->la_mode)); - rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0); + rc = __osd_zap_create(env, osd_obj2dev(obj), &db, + oh->ot_tx, la, parent, 0); if (rc) return ERR_PTR(rc); return db; } static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, struct osd_thandle *oh) + struct lu_attr *la, uint64_t parent, + struct osd_thandle *oh) { const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); dmu_buf_t *db; @@ -1371,7 +1380,7 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, struct osd_device *osd = osd_obj2dev(obj); LASSERT(S_ISREG(la->la_mode)); - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); if (rc) return ERR_PTR(rc); @@ -1393,28 +1402,31 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, } static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, struct osd_thandle *oh) + struct lu_attr *la, uint64_t parent, + struct osd_thandle *oh) { dmu_buf_t *db; int rc; LASSERT(S_ISLNK(la->la_mode)); - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); if (rc) return ERR_PTR(rc); return db; } static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, struct osd_thandle *oh) + struct lu_attr *la, uint64_t parent, + struct osd_thandle *oh) { dmu_buf_t *db; int rc; + la->la_valid = LA_MODE; if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) la->la_valid |= LA_RDEV; - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); if (rc) return ERR_PTR(rc); return db; @@ -1423,6 +1435,7 @@ static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj, typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env, struct osd_object *obj, struct lu_attr *la, + uint64_t parent, struct osd_thandle *oh); static osd_obj_type_f osd_create_type_f(enum dt_format_type type) @@ -1453,6 +1466,28 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type) } /* + * Primitives for directory (i.e. ZAP) handling + */ +static inline int osd_init_lma(const struct lu_env *env, struct osd_object *obj, + const struct lu_fid *fid, struct osd_thandle *oh) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct lu_buf buf; + int rc; + + lustre_lma_init(lma, fid, 0, 0); + lustre_lma_swab(lma); + buf.lb_buf = lma; + buf.lb_len = sizeof(*lma); + + rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA, + LU_XATTR_CREATE, oh); + + return rc; +} + +/* * Concurrency: @dt is write locked. */ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, @@ -1461,16 +1496,14 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_object_format *dof, struct thandle *th) { - struct osd_thread_info *info = osd_oti_get(env); - struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; - struct zpl_direntry *zde = &info->oti_zde.lzd_reg; + struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); - char *buf = info->oti_str; + char *buf = osd_oti_get(env)->oti_str; struct osd_thandle *oh; - dmu_buf_t *db = NULL; - uint64_t zapid, parent = 0; + dmu_buf_t *db; + uint64_t zapid; int rc; ENTRY; @@ -1497,68 +1530,25 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, /* to follow ZFS on-disk format we need * to initialize parent dnode properly */ + zapid = 0; if (hint != NULL && hint->dah_parent != NULL && !dt_object_remote(hint->dah_parent)) - parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object; + zapid = osd_dt_obj(hint->dah_parent)->oo_db->db_object; - /* we may fix some attributes, better do not change the source */ - obj->oo_attr = *attr; - obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE; - - db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh); - if (IS_ERR(db)) { - rc = PTR_ERR(db); - db = NULL; - GOTO(out, rc); - } + db = osd_create_type_f(dof->dof_type)(env, obj, attr, zapid, oh); + if (IS_ERR(db)) + GOTO(out, rc = PTR_ERR(db)); zde->zde_pad = 0; zde->zde_dnode = db->db_object; zde->zde_type = IFTODT(attr->la_mode & S_IFMT); - zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str)); + zapid = osd_get_name_n_idx(env, osd, fid, buf); rc = -zap_add(osd->od_os, zapid, buf, 8, 1, zde, oh->ot_tx); if (rc) GOTO(out, rc); - /* Now add in all of the "SA" attributes */ - rc = -sa_handle_get(osd->od_os, db->db_object, NULL, - SA_HDL_PRIVATE, &obj->oo_sa_hdl); - if (rc) - GOTO(out, rc); - - /* configure new osd object */ - obj->oo_db = db; - parent = parent != 0 ? parent : zapid; - rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx, - &obj->oo_attr, parent); - if (rc) - GOTO(out, rc); - - /* XXX: oo_lma_flags */ - obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT; - smp_mb(); - obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS; - if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu)))) - /* no body operations for accounting objects */ - obj->oo_dt.do_body_ops = &osd_body_ops; - - rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP); - if (rc) - GOTO(out, rc); - - /* initialize LMA */ - lustre_lma_init(lma, lu_object_fid(&obj->oo_dt.do_lu), 0, 0); - lustre_lma_swab(lma); - rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA, - (uchar_t *)lma, sizeof(*lma)); - if (rc) - GOTO(out, rc); - rc = __osd_sa_xattr_update(env, obj, oh); - if (rc) - GOTO(out, rc); - /* Add new object to inode accounting. * Errors are not considered as fatal */ rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid, @@ -1574,12 +1564,18 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d " "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc); + /* configure new osd object */ + obj->oo_db = db; + rc = osd_object_init0(env, obj); + LASSERT(ergo(rc == 0, dt_object_exists(dt))); + LASSERT(osd_invariant(obj)); + + rc = osd_init_lma(env, obj, fid, oh); + if (rc != 0) + CERROR("%s: can not set LMA on "DFID": rc = %d\n", + osd->od_svname, PFID(fid), rc); + out: - if (unlikely(rc && db)) { - dmu_object_free(osd->od_os, db->db_object, oh->ot_tx); - sa_buf_rele(db, osd_obj_tag); - obj->oo_db = NULL; - } up_write(&obj->oo_guard); RETURN(rc); } diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 6d3df99..560c90f 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -142,9 +142,8 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o, { struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; struct lu_attr *la = &osd_oti_get(env)->oti_la; - sa_handle_t *sa_hdl = NULL; + dmu_buf_t *db; dmu_tx_t *tx; - uint64_t oid; int rc; /* verify it doesn't already exist */ @@ -169,36 +168,21 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o, return rc; } - oid = zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64, - DMU_OT_DIRECTORY_CONTENTS, - 14, /* == ZFS fzap_default_block_shift */ - DN_MAX_INDBLKSHIFT, /* indirect block shift */ - DMU_OT_SA, DN_MAX_BONUSLEN, tx); - - rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) - goto commit; la->la_valid = LA_MODE | LA_UID | LA_GID; la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; la->la_uid = la->la_gid = 0; - rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent); - sa_handle_destroy(sa_hdl); - if (rc) - goto commit; + __osd_zap_create(env, o, &db, tx, la, parent, 0); - zde->zde_dnode = oid; + zde->zde_dnode = db->db_object; zde->zde_pad = 0; zde->zde_type = IFTODT(S_IFDIR); rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx); -commit: - if (rc) - dmu_object_free(o->od_os, oid, tx); dmu_tx_commit(tx); - if (rc == 0) - *child = oid; + *child = db->db_object; + sa_buf_rele(db, osd_obj_tag); return rc; } @@ -382,7 +366,7 @@ out: */ static uint64_t osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf, int bufsize) + const struct lu_fid *fid, char *buf) { struct osd_seq *osd_seq; unsigned long b; @@ -407,8 +391,7 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, b = id % OSD_OST_MAP_SIZE; LASSERT(osd_seq->os_compat_dirs[b]); - if (buf) - snprintf(buf, bufsize, LPU64, id); + sprintf(buf, LPU64, id); return osd_seq->os_compat_dirs[b]; } @@ -433,29 +416,28 @@ osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid, LASSERT(osd->od_oi_table != NULL); oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)]; - if (buf) - osd_fid2str(buf, fid); + osd_fid2str(buf, fid); return oi->oi_zapid; } uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf, int bufsize) + const struct lu_fid *fid, char *buf) { uint64_t zapid; LASSERT(fid); + LASSERT(buf); if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) { - zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize); + zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf); } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) { /* special objects with fixed known fids get their name */ char *name = oid2name(fid_oid(fid)); if (name) { zapid = osd->od_root; - if (buf) - strncpy(buf, name, bufsize); + strcpy(buf, name); if (fid_is_acct(fid)) zapid = MASTER_NODE_OBJ; } else { @@ -495,8 +477,8 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev, } else if (unlikely(fid_is_fs_root(fid))) { *oid = dev->od_root; } else { - zapid = osd_get_name_n_idx(env, dev, fid, buf, - sizeof(info->oti_buf)); + zapid = osd_get_name_n_idx(env, dev, fid, buf); + rc = -zap_lookup(dev->od_os, zapid, buf, 8, 1, &info->oti_zde); if (rc) diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index 0ea9977..218464f 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -267,18 +267,39 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -/* the function is used to declare EAs when SA is not supported */ -void __osd_xattr_declare_legacy(const struct lu_env *env, - struct osd_object *obj, - int vallen, const char *name, - struct osd_thandle *oh) +void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, + int vallen, const char *name, + struct osd_thandle *oh) { struct osd_device *osd = osd_obj2dev(obj); - dmu_tx_t *tx = oh->ot_tx; - uint64_t xa_data_obj; - int rc; + dmu_buf_t *db = obj->oo_db; + dmu_tx_t *tx = oh->ot_tx; + uint64_t xa_data_obj; + int rc = 0; + int here; - if (obj->oo_xattr == ZFS_NO_OBJECT) { + if (unlikely(obj->oo_destroyed)) + return; + + here = dt_object_exists(&obj->oo_dt); + + /* object may be not yet created */ + if (here) { + LASSERT(db); + LASSERT(obj->oo_sa_hdl); + /* we might just update SA_ZPL_DXATTR */ + dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1); + + if (obj->oo_xattr == ZFS_NO_OBJECT) + rc = -ENOENT; + } + + if (!here || rc == -ENOENT) { + /* we'll be updating SA_ZPL_XATTR */ + if (here) { + LASSERT(obj->oo_sa_hdl); + dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1); + } /* xattr zap + entry */ dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name); /* xattr value obj */ @@ -297,6 +318,7 @@ void __osd_xattr_declare_legacy(const struct lu_env *env, dmu_tx_hold_bonus(tx, xa_data_obj); dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END); dmu_tx_hold_write(tx, xa_data_obj, 0, vallen); + return; } else if (rc == -ENOENT) { /* * Entry doesn't exist, we need to create a new one and a new @@ -306,43 +328,11 @@ void __osd_xattr_declare_legacy(const struct lu_env *env, dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name); dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE); dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen); - } -} - -void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, - int vallen, const char *name, - struct osd_thandle *oh) -{ - dmu_buf_t *db = obj->oo_db; - dmu_tx_t *tx = oh->ot_tx; - - if (unlikely(obj->oo_destroyed)) - return; - - if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) { - __osd_xattr_declare_legacy(env, obj, vallen, name, oh); return; } - /* declare EA in SA */ - if (dt_object_exists(&obj->oo_dt)) { - LASSERT(obj->oo_sa_hdl); - /* XXX: it should be possible to skip spill - * declaration if specific EA is part of - * bonus and doesn't grow */ - dmu_tx_hold_spill(tx, db->db_object); - return; - } - - /* the object doesn't exist, but we've declared bonus - * in osd_declare_object_create() yet */ - if (obj->oo_ea_in_bonus > DN_MAX_BONUSLEN) { - /* spill has been declared already */ - } else if (obj->oo_ea_in_bonus + vallen > DN_MAX_BONUSLEN) { - /* we're about to exceed bonus, let's declare spill */ - dmu_tx_hold_spill(tx, DMU_NEW_OBJECT); - } - obj->oo_ea_in_bonus += vallen; + /* An error happened */ + tx->tx_err = -rc; } int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, @@ -504,7 +494,8 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, la->la_valid = LA_MODE; la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; - rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, 0); + rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, + obj->oo_db->db_object, 0); if (rc) return rc; @@ -559,7 +550,8 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, la->la_valid = LA_MODE; la->la_mode = S_IFREG | S_IRUGO | S_IWUSR; - rc = __osd_object_create(env, obj, &xa_data_db, tx, la); + rc = __osd_object_create(env, obj, &xa_data_db, tx, la, + obj->oo_xattr); if (rc) goto out; xa_data_obj = xa_data_db->db_object; -- 1.8.3.1