From ead6df2feee9c143b617cb60e50e403c955bd401 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Wed, 23 Mar 2016 21:42:54 +0300 Subject: [PATCH] LU-7898 osd: remove unnecessary declarations Refactor the code a bit to remove unnecessary declarations (which are very expensive in ZFS). The patch also introduces initial preparations to support large dnodes - it tracks all declared EAs at object creation and tracked number can be used to request dnode of appropriate size. With this patch + LU-7918 disk/memory space reserved for a single-stripe creation goes down from ~33MB to 4.6MB. Performance improvements from this patch are also significant. Running mdtest create performance on a test node (ramdisk): Threads 0.6.5 0.6.5+patch 1 9933 14279 2 12870 20469 4 16405 26407 8 19320 28254 16 15648 26620 32 14107 26483 Change-Id: I0778ad8d13ba1f7a5fa5ad5d874fbb1bd7203958 Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/19101 Reviewed-by: John L. Hammond Reviewed-by: Andreas Dilger Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_index.c | 30 ++--- lustre/osd-zfs/osd_internal.h | 27 +++-- lustre/osd-zfs/osd_io.c | 13 +-- lustre/osd-zfs/osd_object.c | 254 +++++++++++++++++++++--------------------- lustre/osd-zfs/osd_oi.c | 46 +++++--- lustre/osd-zfs/osd_xattr.c | 82 ++++++++------ 6 files changed, 245 insertions(+), 207 deletions(-) diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index 8c1ffbf..5f86160 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -165,7 +165,6 @@ static struct dt_it *osd_index_it_init(const struct lu_env *env, LASSERT(lu_object_exists(lo)); LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); LASSERT(info); OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS); @@ -425,8 +424,6 @@ static int osd_dir_lookup(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - LASSERT(osd_object_is_zap(obj->oo_db)); - if (name[0] == '.') { if (name[1] == 0) { const struct lu_fid *f = lu_object_fid(&dt->do_lu); @@ -476,8 +473,10 @@ static int osd_declare_dir_insert(const struct lu_env *env, else object = obj->oo_db->db_object; - dmu_tx_hold_bonus(oh->ot_tx, object); - dmu_tx_hold_zap(oh->ot_tx, object, TRUE, (char *)key); + /* do not specify the key as then DMU is trying to look it up + * which is very expensive. usually the layers above lookup + * before insertion */ + dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL); RETURN(0); } @@ -624,7 +623,6 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(parent->oo_db); - LASSERT(osd_object_is_zap(parent->oo_db)); LASSERT(dt_object_exists(dt)); LASSERT(osd_invariant(parent)); @@ -726,12 +724,15 @@ static int osd_declare_dir_delete(const struct lu_env *env, if (dt_object_exists(dt)) { LASSERT(obj->oo_db); - LASSERT(osd_object_is_zap(obj->oo_db)); dnode = obj->oo_db->db_object; } else { dnode = DMU_NEW_OBJECT; } - dmu_tx_hold_zap(oh->ot_tx, dnode, TRUE, (char *)key); + + /* do not specify the key as then DMU is trying to look it up + * which is very expensive. usually the layers above lookup + * before deletion */ + dmu_tx_hold_zap(oh->ot_tx, dnode, FALSE, NULL); RETURN(0); } @@ -748,7 +749,6 @@ static int osd_dir_delete(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(zap_db); - LASSERT(osd_object_is_zap(zap_db)); LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); @@ -1212,9 +1212,9 @@ static int osd_declare_index_insert(const struct lu_env *env, dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object); - /* It is not clear what API should be used for binary keys, so we pass - * a null name which has the side effect of over-reserving space, - * accounting for the worst case. See zap_count_write() */ + /* do not specify the key as then DMU is trying to look it up + * which is very expensive. usually the layers above lookup + * before insertion */ dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL); RETURN(0); @@ -1262,7 +1262,11 @@ static int osd_declare_index_delete(const struct lu_env *env, LASSERT(obj->oo_db); oh = container_of0(th, struct osd_thandle, ot_super); - dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, TRUE, NULL); + + /* do not specify the key as then DMU is trying to look it up + * which is very expensive. usually the layers above lookup + * before deletion */ + dmu_tx_hold_zap(oh->ot_tx, obj->oo_db->db_object, FALSE, NULL); RETURN(0); } diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 0706ea1..6fb7a6e 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -347,10 +347,17 @@ struct osd_object { /* the i_flags in LMA */ __u32 oo_lma_flags; - /* record size for index file */ - unsigned char oo_keysize; - unsigned char oo_recsize; - unsigned char oo_recusize; /* unit size */ + union { + int oo_ea_in_bonus; /* EA bytes we expect */ + struct { + /* record size for index file */ + unsigned char oo_keysize; + unsigned char oo_recsize; + unsigned char oo_recusize; /* unit size */ + }; + }; + + }; int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *); @@ -465,10 +472,12 @@ int osd_object_sa_update(struct osd_object *obj, sa_attr_type_t type, void *buf, uint32_t buflen, struct osd_thandle *oh); int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, dmu_buf_t **zap_dbp, dmu_tx_t *tx, struct lu_attr *la, - uint64_t parent, zap_flags_t flags); + zap_flags_t flags); int __osd_object_create(const struct lu_env *env, struct osd_object *obj, - dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la, - uint64_t parent); + dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la); +int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, + sa_handle_t *sa_hdl, dmu_tx_t *tx, + struct lu_attr *la, uint64_t parent); /* osd_oi.c */ int osd_oi_init(const struct lu_env *env, struct osd_device *o); @@ -476,7 +485,7 @@ void osd_oi_fini(const struct lu_env *env, struct osd_device *o); int osd_fid_lookup(const struct lu_env *env, struct osd_device *, const struct lu_fid *, uint64_t *); uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf); + const struct lu_fid *fid, char *buf, int bufsize); int osd_options_init(void); int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd, __u64 seq); @@ -525,6 +534,8 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, int __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int fl, struct osd_thandle *oh); +int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh); static inline int osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int fl, diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index eed35f5..d94bc97 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -168,18 +168,11 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, * LOHA_EXISTs is supposed to be the last step in the * initialization */ - /* declare possible size change. notice we can't check - * current size here as another thread can change it */ - - if (dt_object_exists(dt)) { - LASSERT(obj->oo_db); + /* size change (in dnode) will be declared by dmu_tx_hold_write() */ + if (dt_object_exists(dt)) oid = obj->oo_db->db_object; - - dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); - } else { + else oid = DMU_NEW_OBJECT; - dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); - } /* XXX: we still miss for append declaration support in ZFS * -1 means append which is used by llog mostly, llog diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index bd1fac2..ef912ad 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -504,7 +504,6 @@ static int osd_declare_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - char *buf = osd_oti_get(env)->oti_str; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); @@ -520,17 +519,14 @@ static int osd_declare_object_destroy(const struct lu_env *env, LASSERT(oh->ot_tx != NULL); /* declare that we'll remove object from fid-dnode mapping */ - zapid = osd_get_name_n_idx(env, osd, fid, buf); - dmu_tx_hold_bonus(oh->ot_tx, zapid); - dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, buf); + zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0); + dmu_tx_hold_zap(oh->ot_tx, zapid, FALSE, NULL); osd_declare_xattrs_destroy(env, obj, oh); /* declare that we'll remove object from inode accounting ZAPs */ - dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, buf); - dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, buf); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); /* one less inode */ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, @@ -557,7 +553,8 @@ static int osd_declare_object_destroy(const struct lu_env *env, static int osd_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - char *buf = osd_oti_get(env)->oti_str; + struct osd_thread_info *info = osd_oti_get(env); + char *buf = info->oti_str; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); const struct lu_fid *fid = lu_object_fid(&dt->do_lu); @@ -578,7 +575,7 @@ static int osd_object_destroy(const struct lu_env *env, LASSERT(oh->ot_tx != NULL); /* remove obj ref from index dir (it depends) */ - zapid = osd_get_name_n_idx(env, osd, fid, buf); + zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str)); rc = -zap_remove(osd->od_os, zapid, buf, oh->ot_tx); if (rc) { CERROR("%s: zap_remove(%s) failed: rc = %d\n", @@ -846,13 +843,14 @@ static int osd_declare_attr_set(const struct lu_env *env, struct thandle *handle) { struct osd_thread_info *info = osd_oti_get(env); - char *buf = osd_oti_get(env)->oti_str; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); + dmu_tx_hold_t *txh; struct osd_thandle *oh; uint64_t bspace; uint32_t blksize; int rc = 0; + bool found; ENTRY; @@ -867,20 +865,39 @@ static int osd_declare_attr_set(const struct lu_env *env, LASSERT(obj->oo_sa_hdl != NULL); LASSERT(oh->ot_tx != NULL); - dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); + /* regular attributes are part of the bonus buffer */ + /* let's check whether this object is already part of + * transaction.. */ + found = false; + for (txh = list_head(&oh->ot_tx->tx_holds); txh; + txh = list_next(&oh->ot_tx->tx_holds, txh)) { + if (txh->txh_dnode == NULL) + continue; + if (txh->txh_dnode->dn_object != obj->oo_db->db_object) + continue; + /* this object is part of the transaction already + * we don't need to declare bonus again */ + found = true; + break; + } + if (!found) + dmu_tx_hold_bonus(oh->ot_tx, obj->oo_db->db_object); if (oh->ot_tx->tx_err != 0) GOTO(out, rc = -oh->ot_tx->tx_err); - sa_object_size(obj->oo_sa_hdl, &blksize, &bspace); - bspace = toqb(bspace * blksize); + if (attr && attr->la_valid & LA_FLAGS) { + /* LMA is usually a part of bonus, no need to declare + * anything else */ + } - __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs), - XATTR_NAME_LMA, oh); + if (attr && (attr->la_valid & (LA_UID | LA_GID))) { + sa_object_size(obj->oo_sa_hdl, &blksize, &bspace); + bspace = toqb(bspace * blksize); + } if (attr && attr->la_valid & LA_UID) { /* account for user inode tracking ZAP update */ - dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); /* quota enforcement for user */ if (attr->la_uid != obj->oo_attr.la_uid) { @@ -894,8 +911,7 @@ static int osd_declare_attr_set(const struct lu_env *env, } if (attr && attr->la_valid & LA_GID) { /* account for user inode tracking ZAP update */ - dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); /* quota enforcement for group */ if (attr->la_gid != obj->oo_attr.la_gid) { @@ -1121,13 +1137,12 @@ static int osd_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *handle) { - char *buf = osd_oti_get(env)->oti_str; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t zapid; - int rc; + int rc, dnode_size; ENTRY; LASSERT(dof); @@ -1147,18 +1162,26 @@ static int osd_declare_object_create(const struct lu_env *env, oh = container_of0(handle, struct osd_thandle, ot_super); LASSERT(oh->ot_tx != NULL); + /* this is the minimum set of EAs on every Lustre object */ + obj->oo_ea_in_bonus = ZFS_SA_BASE_ATTR_SIZE + + sizeof(__u64) + /* VBR VERSION */ + sizeof(struct lustre_mdt_attrs); /* LMA */ + /* reserve 32 bytes for extra stuff like ACLs */ + dnode_size = size_roundup_power2(obj->oo_ea_in_bonus + 32); + switch (dof->dof_type) { case DFT_DIR: dt->do_index_ops = &osd_dir_ops; case DFT_INDEX: /* for zap create */ - dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, 1, NULL); + dmu_tx_hold_zap(oh->ot_tx, DMU_NEW_OBJECT, FALSE, NULL); + dmu_tx_hold_sa_create(oh->ot_tx, dnode_size); break; case DFT_REGULAR: case DFT_SYM: case DFT_NODE: /* first, we'll create new object */ - dmu_tx_hold_bonus(oh->ot_tx, DMU_NEW_OBJECT); + dmu_tx_hold_sa_create(oh->ot_tx, dnode_size); break; default: @@ -1167,20 +1190,12 @@ static int osd_declare_object_create(const struct lu_env *env, } /* and we'll add it to some mapping */ - zapid = osd_get_name_n_idx(env, osd, fid, buf); - dmu_tx_hold_bonus(oh->ot_tx, zapid); - dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, buf); + zapid = osd_get_name_n_idx(env, osd, fid, NULL, 0); + dmu_tx_hold_zap(oh->ot_tx, zapid, TRUE, NULL); /* we will also update inode accounting ZAPs */ - dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf); - dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); - dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf); - - dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); - - __osd_xattr_declare_set(env, obj, sizeof(struct lustre_mdt_attrs), - XATTR_NAME_LMA, oh); + dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); + dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh, false, NULL, false); @@ -1188,10 +1203,9 @@ static int osd_declare_object_create(const struct lu_env *env, } int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, - uint64_t oid, dmu_tx_t *tx, struct lu_attr *la, - uint64_t parent) + sa_handle_t *sa_hdl, dmu_tx_t *tx, + struct lu_attr *la, uint64_t parent) { - sa_handle_t *sa_hdl; sa_bulk_attr_t *bulk = osd_oti_get(env)->oti_attr_bulk; struct osa_attr *osa = &osd_oti_get(env)->oti_osa; uint64_t gen; @@ -1200,9 +1214,10 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, int cnt; int rc; - gethrestime(&now); - gen = dmu_tx_get_txg(tx); + LASSERT(sa_hdl); + gen = dmu_tx_get_txg(tx); + gethrestime(&now); ZFS_TIME_ENCODE(&now, crtime); osa->atime[0] = la->la_atime; @@ -1216,11 +1231,6 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, osa->flags = attrs_fs2zfs(la->la_flags); osa->size = la->la_size; - /* Now add in all of the "SA" attributes */ - rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); - if (rc) - return rc; - /* * we need to create all SA below upon object create. * @@ -1249,7 +1259,6 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx); - sa_handle_destroy(sa_hdl); return rc; } @@ -1259,8 +1268,7 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, * to a transaction group. */ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, - dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la, - uint64_t parent) + dmu_buf_t **dbp, dmu_tx_t *tx, struct lu_attr *la) { uint64_t oid; int rc; @@ -1268,10 +1276,6 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); dmu_object_type_t type = DMU_OT_PLAIN_FILE_CONTENTS; - /* Assert that the transaction has been assigned to a - transaction group. */ - LASSERT(tx->tx_txg != 0); - /* Use DMU_OTN_UINT8_METADATA for local objects so their data blocks * would get an additional ditto copy */ if (unlikely(S_ISREG(la->la_mode) && @@ -1287,14 +1291,6 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, la->la_size = 0; la->la_nlink = 1; - rc = __osd_attr_init(env, osd, oid, tx, la, parent); - if (rc != 0) { - sa_buf_rele(*dbp, osd_obj_tag); - *dbp = NULL; - dmu_object_free(osd->od_os, oid, tx); - return rc; - } - return 0; } @@ -1310,7 +1306,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, * a conversion from the different internal ZAP hash formats being used. */ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, dmu_buf_t **zap_dbp, dmu_tx_t *tx, - struct lu_attr *la, uint64_t parent, zap_flags_t flags) + struct lu_attr *la, zap_flags_t flags) { uint64_t oid; int rc; @@ -1329,16 +1325,14 @@ int __osd_zap_create(const struct lu_env *env, struct osd_device *osd, if (rc) return rc; - LASSERT(la->la_valid & LA_MODE); la->la_size = 2; la->la_nlink = 1; - return __osd_attr_init(env, osd, oid, tx, la, parent); + return 0; } static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, uint64_t parent, - struct osd_thandle *oh) + struct lu_attr *la, struct osd_thandle *oh) { dmu_buf_t *db; int rc; @@ -1348,7 +1342,7 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, * We set ZAP_FLAG_UINT64_KEY to let ZFS know than we are going to use * binary keys */ LASSERT(S_ISREG(la->la_mode)); - rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, parent, + rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, ZAP_FLAG_UINT64_KEY); if (rc) return ERR_PTR(rc); @@ -1356,23 +1350,20 @@ static dmu_buf_t *osd_mkidx(const struct lu_env *env, struct osd_object *obj, } static dmu_buf_t *osd_mkdir(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, uint64_t parent, - struct osd_thandle *oh) + struct lu_attr *la, struct osd_thandle *oh) { dmu_buf_t *db; int rc; LASSERT(S_ISDIR(la->la_mode)); - rc = __osd_zap_create(env, osd_obj2dev(obj), &db, - oh->ot_tx, la, parent, 0); + rc = __osd_zap_create(env, osd_obj2dev(obj), &db, oh->ot_tx, la, 0); if (rc) return ERR_PTR(rc); return db; } static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, uint64_t parent, - struct osd_thandle *oh) + struct lu_attr *la, struct osd_thandle *oh) { const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); dmu_buf_t *db; @@ -1380,7 +1371,7 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, struct osd_device *osd = osd_obj2dev(obj); LASSERT(S_ISREG(la->la_mode)); - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); if (rc) return ERR_PTR(rc); @@ -1402,31 +1393,28 @@ static dmu_buf_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj, } static dmu_buf_t *osd_mksym(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, uint64_t parent, - struct osd_thandle *oh) + struct lu_attr *la, struct osd_thandle *oh) { dmu_buf_t *db; int rc; LASSERT(S_ISLNK(la->la_mode)); - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); if (rc) return ERR_PTR(rc); return db; } static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj, - struct lu_attr *la, uint64_t parent, - struct osd_thandle *oh) + struct lu_attr *la, struct osd_thandle *oh) { dmu_buf_t *db; int rc; - la->la_valid = LA_MODE; if (S_ISCHR(la->la_mode) || S_ISBLK(la->la_mode)) la->la_valid |= LA_RDEV; - rc = __osd_object_create(env, obj, &db, oh->ot_tx, la, parent); + rc = __osd_object_create(env, obj, &db, oh->ot_tx, la); if (rc) return ERR_PTR(rc); return db; @@ -1435,7 +1423,6 @@ static dmu_buf_t *osd_mknod(const struct lu_env *env, struct osd_object *obj, typedef dmu_buf_t *(*osd_obj_type_f)(const struct lu_env *env, struct osd_object *obj, struct lu_attr *la, - uint64_t parent, struct osd_thandle *oh); static osd_obj_type_f osd_create_type_f(enum dt_format_type type) @@ -1466,28 +1453,6 @@ static osd_obj_type_f osd_create_type_f(enum dt_format_type type) } /* - * Primitives for directory (i.e. ZAP) handling - */ -static inline int osd_init_lma(const struct lu_env *env, struct osd_object *obj, - const struct lu_fid *fid, struct osd_thandle *oh) -{ - struct osd_thread_info *info = osd_oti_get(env); - struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; - struct lu_buf buf; - int rc; - - lustre_lma_init(lma, fid, 0, 0); - lustre_lma_swab(lma); - buf.lb_buf = lma; - buf.lb_len = sizeof(*lma); - - rc = osd_xattr_set_internal(env, obj, &buf, XATTR_NAME_LMA, - LU_XATTR_CREATE, oh); - - return rc; -} - -/* * Concurrency: @dt is write locked. */ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, @@ -1496,14 +1461,16 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_object_format *dof, struct thandle *th) { - struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; + struct osd_thread_info *info = osd_oti_get(env); + struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs; + struct zpl_direntry *zde = &info->oti_zde.lzd_reg; const struct lu_fid *fid = lu_object_fid(&dt->do_lu); struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); - char *buf = osd_oti_get(env)->oti_str; + char *buf = info->oti_str; struct osd_thandle *oh; - dmu_buf_t *db; - uint64_t zapid; + dmu_buf_t *db = NULL; + uint64_t zapid, parent = 0; int rc; ENTRY; @@ -1530,25 +1497,68 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, /* to follow ZFS on-disk format we need * to initialize parent dnode properly */ - zapid = 0; if (hint != NULL && hint->dah_parent != NULL && !dt_object_remote(hint->dah_parent)) - zapid = osd_dt_obj(hint->dah_parent)->oo_db->db_object; + parent = osd_dt_obj(hint->dah_parent)->oo_db->db_object; - db = osd_create_type_f(dof->dof_type)(env, obj, attr, zapid, oh); - if (IS_ERR(db)) - GOTO(out, rc = PTR_ERR(db)); + /* we may fix some attributes, better do not change the source */ + obj->oo_attr = *attr; + obj->oo_attr.la_valid |= LA_SIZE | LA_NLINK | LA_TYPE; + + db = osd_create_type_f(dof->dof_type)(env, obj, &obj->oo_attr, oh); + if (IS_ERR(db)) { + rc = PTR_ERR(db); + db = NULL; + GOTO(out, rc); + } zde->zde_pad = 0; zde->zde_dnode = db->db_object; zde->zde_type = IFTODT(attr->la_mode & S_IFMT); - zapid = osd_get_name_n_idx(env, osd, fid, buf); + zapid = osd_get_name_n_idx(env, osd, fid, buf, sizeof(info->oti_str)); rc = -zap_add(osd->od_os, zapid, buf, 8, 1, zde, oh->ot_tx); if (rc) GOTO(out, rc); + /* Now add in all of the "SA" attributes */ + rc = -sa_handle_get(osd->od_os, db->db_object, NULL, + SA_HDL_PRIVATE, &obj->oo_sa_hdl); + if (rc) + GOTO(out, rc); + + /* configure new osd object */ + obj->oo_db = db; + parent = parent != 0 ? parent : zapid; + rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx, + &obj->oo_attr, parent); + if (rc) + GOTO(out, rc); + + /* XXX: oo_lma_flags */ + obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT; + smp_mb(); + obj->oo_dt.do_lu.lo_header->loh_attr |= LOHA_EXISTS; + if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu)))) + /* no body operations for accounting objects */ + obj->oo_dt.do_body_ops = &osd_body_ops; + + rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP); + if (rc) + GOTO(out, rc); + + /* initialize LMA */ + lustre_lma_init(lma, lu_object_fid(&obj->oo_dt.do_lu), 0, 0); + lustre_lma_swab(lma); + rc = -nvlist_add_byte_array(obj->oo_sa_xattr, XATTR_NAME_LMA, + (uchar_t *)lma, sizeof(*lma)); + if (rc) + GOTO(out, rc); + rc = __osd_sa_xattr_update(env, obj, oh); + if (rc) + GOTO(out, rc); + /* Add new object to inode accounting. * Errors are not considered as fatal */ rc = -zap_increment_int(osd->od_os, osd->od_iusr_oid, @@ -1564,18 +1574,12 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d " "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc); - /* configure new osd object */ - obj->oo_db = db; - rc = osd_object_init0(env, obj); - LASSERT(ergo(rc == 0, dt_object_exists(dt))); - LASSERT(osd_invariant(obj)); - - rc = osd_init_lma(env, obj, fid, oh); - if (rc != 0) - CERROR("%s: can not set LMA on "DFID": rc = %d\n", - osd->od_svname, PFID(fid), rc); - out: + if (unlikely(rc && db)) { + dmu_object_free(osd->od_os, db->db_object, oh->ot_tx); + sa_buf_rele(db, osd_obj_tag); + obj->oo_db = NULL; + } up_write(&obj->oo_guard); RETURN(rc); } diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 560c90f..6d3df99 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -142,8 +142,9 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o, { struct zpl_direntry *zde = &osd_oti_get(env)->oti_zde.lzd_reg; struct lu_attr *la = &osd_oti_get(env)->oti_la; - dmu_buf_t *db; + sa_handle_t *sa_hdl = NULL; dmu_tx_t *tx; + uint64_t oid; int rc; /* verify it doesn't already exist */ @@ -168,21 +169,36 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o, return rc; } + oid = zap_create_flags(o->od_os, 0, ZAP_FLAG_HASH64, + DMU_OT_DIRECTORY_CONTENTS, + 14, /* == ZFS fzap_default_block_shift */ + DN_MAX_INDBLKSHIFT, /* indirect block shift */ + DMU_OT_SA, DN_MAX_BONUSLEN, tx); + + rc = -sa_handle_get(o->od_os, oid, NULL, SA_HDL_PRIVATE, &sa_hdl); + if (rc) + goto commit; la->la_valid = LA_MODE | LA_UID | LA_GID; la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; la->la_uid = la->la_gid = 0; - __osd_zap_create(env, o, &db, tx, la, parent, 0); + rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent); + sa_handle_destroy(sa_hdl); + if (rc) + goto commit; - zde->zde_dnode = db->db_object; + zde->zde_dnode = oid; zde->zde_pad = 0; zde->zde_type = IFTODT(S_IFDIR); rc = -zap_add(o->od_os, parent, name, 8, 1, (void *)zde, tx); +commit: + if (rc) + dmu_object_free(o->od_os, oid, tx); dmu_tx_commit(tx); - *child = db->db_object; - sa_buf_rele(db, osd_obj_tag); + if (rc == 0) + *child = oid; return rc; } @@ -366,7 +382,7 @@ out: */ static uint64_t osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf) + const struct lu_fid *fid, char *buf, int bufsize) { struct osd_seq *osd_seq; unsigned long b; @@ -391,7 +407,8 @@ osd_get_idx_for_ost_obj(const struct lu_env *env, struct osd_device *osd, b = id % OSD_OST_MAP_SIZE; LASSERT(osd_seq->os_compat_dirs[b]); - sprintf(buf, LPU64, id); + if (buf) + snprintf(buf, bufsize, LPU64, id); return osd_seq->os_compat_dirs[b]; } @@ -416,28 +433,29 @@ osd_get_idx_for_fid(struct osd_device *osd, const struct lu_fid *fid, LASSERT(osd->od_oi_table != NULL); oi = osd->od_oi_table[fid_seq(fid) & (osd->od_oi_count - 1)]; - osd_fid2str(buf, fid); + if (buf) + osd_fid2str(buf, fid); return oi->oi_zapid; } uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid, char *buf) + const struct lu_fid *fid, char *buf, int bufsize) { uint64_t zapid; LASSERT(fid); - LASSERT(buf); if (fid_is_on_ost(env, osd, fid) == 1 || fid_seq(fid) == FID_SEQ_ECHO) { - zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf); + zapid = osd_get_idx_for_ost_obj(env, osd, fid, buf, bufsize); } else if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) { /* special objects with fixed known fids get their name */ char *name = oid2name(fid_oid(fid)); if (name) { zapid = osd->od_root; - strcpy(buf, name); + if (buf) + strncpy(buf, name, bufsize); if (fid_is_acct(fid)) zapid = MASTER_NODE_OBJ; } else { @@ -477,8 +495,8 @@ int osd_fid_lookup(const struct lu_env *env, struct osd_device *dev, } else if (unlikely(fid_is_fs_root(fid))) { *oid = dev->od_root; } else { - zapid = osd_get_name_n_idx(env, dev, fid, buf); - + zapid = osd_get_name_n_idx(env, dev, fid, buf, + sizeof(info->oti_buf)); rc = -zap_lookup(dev->od_os, zapid, buf, 8, 1, &info->oti_zde); if (rc) diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index 218464f..0ea9977 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -267,39 +267,18 @@ int osd_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, - int vallen, const char *name, - struct osd_thandle *oh) +/* the function is used to declare EAs when SA is not supported */ +void __osd_xattr_declare_legacy(const struct lu_env *env, + struct osd_object *obj, + int vallen, const char *name, + struct osd_thandle *oh) { struct osd_device *osd = osd_obj2dev(obj); - dmu_buf_t *db = obj->oo_db; - dmu_tx_t *tx = oh->ot_tx; - uint64_t xa_data_obj; - int rc = 0; - int here; - - if (unlikely(obj->oo_destroyed)) - return; - - here = dt_object_exists(&obj->oo_dt); - - /* object may be not yet created */ - if (here) { - LASSERT(db); - LASSERT(obj->oo_sa_hdl); - /* we might just update SA_ZPL_DXATTR */ - dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1); - - if (obj->oo_xattr == ZFS_NO_OBJECT) - rc = -ENOENT; - } + dmu_tx_t *tx = oh->ot_tx; + uint64_t xa_data_obj; + int rc; - if (!here || rc == -ENOENT) { - /* we'll be updating SA_ZPL_XATTR */ - if (here) { - LASSERT(obj->oo_sa_hdl); - dmu_tx_hold_sa(tx, obj->oo_sa_hdl, 1); - } + if (obj->oo_xattr == ZFS_NO_OBJECT) { /* xattr zap + entry */ dmu_tx_hold_zap(tx, DMU_NEW_OBJECT, TRUE, (char *) name); /* xattr value obj */ @@ -318,7 +297,6 @@ void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, dmu_tx_hold_bonus(tx, xa_data_obj); dmu_tx_hold_free(tx, xa_data_obj, vallen, DMU_OBJECT_END); dmu_tx_hold_write(tx, xa_data_obj, 0, vallen); - return; } else if (rc == -ENOENT) { /* * Entry doesn't exist, we need to create a new one and a new @@ -328,11 +306,43 @@ void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, dmu_tx_hold_zap(tx, obj->oo_xattr, TRUE, (char *) name); dmu_tx_hold_sa_create(tx, ZFS_SA_BASE_ATTR_SIZE); dmu_tx_hold_write(tx, DMU_NEW_OBJECT, 0, vallen); + } +} + +void __osd_xattr_declare_set(const struct lu_env *env, struct osd_object *obj, + int vallen, const char *name, + struct osd_thandle *oh) +{ + dmu_buf_t *db = obj->oo_db; + dmu_tx_t *tx = oh->ot_tx; + + if (unlikely(obj->oo_destroyed)) + return; + + if (unlikely(!osd_obj2dev(obj)->od_xattr_in_sa)) { + __osd_xattr_declare_legacy(env, obj, vallen, name, oh); return; } - /* An error happened */ - tx->tx_err = -rc; + /* declare EA in SA */ + if (dt_object_exists(&obj->oo_dt)) { + LASSERT(obj->oo_sa_hdl); + /* XXX: it should be possible to skip spill + * declaration if specific EA is part of + * bonus and doesn't grow */ + dmu_tx_hold_spill(tx, db->db_object); + return; + } + + /* the object doesn't exist, but we've declared bonus + * in osd_declare_object_create() yet */ + if (obj->oo_ea_in_bonus > DN_MAX_BONUSLEN) { + /* spill has been declared already */ + } else if (obj->oo_ea_in_bonus + vallen > DN_MAX_BONUSLEN) { + /* we're about to exceed bonus, let's declare spill */ + dmu_tx_hold_spill(tx, DMU_NEW_OBJECT); + } + obj->oo_ea_in_bonus += vallen; } int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, @@ -494,8 +504,7 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, la->la_valid = LA_MODE; la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; - rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, - obj->oo_db->db_object, 0); + rc = __osd_zap_create(env, osd, &xa_zap_db, tx, la, 0); if (rc) return rc; @@ -550,8 +559,7 @@ __osd_xattr_set(const struct lu_env *env, struct osd_object *obj, la->la_valid = LA_MODE; la->la_mode = S_IFREG | S_IRUGO | S_IWUSR; - rc = __osd_object_create(env, obj, &xa_data_db, tx, la, - obj->oo_xattr); + rc = __osd_object_create(env, obj, &xa_data_db, tx, la); if (rc) goto out; xa_data_obj = xa_data_db->db_object; -- 1.8.3.1