From 2c9ff6dffdf4320af95c9db9af07a416529275f0 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Tue, 28 Feb 2017 12:44:14 +0300 Subject: [PATCH] LU-7899 osd: batch EA updates during file creation we set number of EAs: LMA, VBR, LinkEA, LOVEA, ACLs. calling into SA to refill spill again and again is expensive. thus it makes sense to postpone this to osd_trans_stop() where all changed EAs has been already collected in a temporary buffer. Change-Id: Ia2604ddafdf8b2ca4f6db4d70ead6d2d2761cd26 Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/19143 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Bobi Jam Reviewed-by: Oleg Drokin Reviewed-on: https://review.whamcloud.com/21893 --- lustre/osd-zfs/osd_handler.c | 6 +- lustre/osd-zfs/osd_internal.h | 24 +++++-- lustre/osd-zfs/osd_object.c | 71 ++++++++++++++------ lustre/osd-zfs/osd_oi.c | 2 +- lustre/osd-zfs/osd_xattr.c | 148 ++++++++++++++++++++++++++++++++++-------- 5 files changed, 195 insertions(+), 56 deletions(-) diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index fe41089..36e3e46 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -292,7 +292,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, if (oh->ot_assigned == 0) { LASSERT(oh->ot_tx); dmu_tx_abort(oh->ot_tx); - osd_object_sa_dirty_rele(oh); + osd_object_sa_dirty_rele(env, oh); osd_unlinked_list_emptify(env, osd, &unlinked, false); /* there won't be any commit, release reserved quota space now, * if any */ @@ -311,7 +311,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, LASSERT(oh->ot_tx); txg = oh->ot_tx->tx_txg; - osd_object_sa_dirty_rele(oh); + osd_object_sa_dirty_rele(env, oh); /* XXX: Once dmu_tx_commit() called, oh/th could have been freed * by osd_trans_commit_cb already. */ dmu_tx_commit(oh->ot_tx); @@ -357,7 +357,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, INIT_LIST_HEAD(&oh->ot_stop_dcb_list); INIT_LIST_HEAD(&oh->ot_unlinked_list); INIT_LIST_HEAD(&oh->ot_sa_list); - sema_init(&oh->ot_sa_lock, 1); memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans)); th = &oh->ot_super; th->th_dev = dt; @@ -716,6 +715,7 @@ static void osd_key_fini(const struct lu_context *ctx, info->oti_ins_cache = NULL; info->oti_ins_cache_size = 0; } + lu_buf_free(&info->oti_xattr_lbuf); OBD_FREE_PTR(info); } diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 12702e2..bda22e9 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -213,6 +213,7 @@ struct osd_thread_info { struct osd_idmap_cache *oti_ins_cache; int oti_ins_cache_size; int oti_ins_cache_used; + struct lu_buf oti_xattr_lbuf; }; extern struct lu_context_key osd_key; @@ -228,7 +229,6 @@ struct osd_thandle { struct list_head ot_stop_dcb_list; struct list_head ot_unlinked_list; struct list_head ot_sa_list; - struct semaphore ot_sa_lock; dmu_tx_t *ot_tx; struct lquota_trans ot_quota_trans; __u32 ot_write_commit:1, @@ -358,7 +358,9 @@ struct osd_object { uint64_t oo_xattr; enum osd_destroy_type oo_destroy; - __u32 oo_destroyed:1; + __u32 oo_destroyed:1, + oo_late_xattr:1, + oo_late_attr_set:1; /* the i_flags in LMA */ __u32 oo_lma_flags; @@ -370,6 +372,7 @@ struct osd_object { unsigned char oo_recsize; unsigned char oo_recusize; /* unit size */ }; + uint64_t oo_parent; /* used only at object creation */ }; }; @@ -487,8 +490,11 @@ int osd_procfs_fini(struct osd_device *osd); /* osd_object.c */ extern char *osd_obj_tag; -void osd_object_sa_dirty_rele(struct osd_thandle *oh); int __osd_obj2dnode(objset_t *os, uint64_t oid, dnode_t **dnp); +void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh); +void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh); +int __osd_obj2dbuf(const struct lu_env *env, objset_t *os, + uint64_t oid, dmu_buf_t **dbp); struct lu_object *osd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d); @@ -501,7 +507,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj, dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la); int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, sa_handle_t *sa_hdl, dmu_tx_t *tx, - struct lu_attr *la, uint64_t parent); + struct lu_attr *la, uint64_t parent, nvlist_t *); /* osd_oi.c */ int osd_oi_init(const struct lu_env *env, struct osd_device *o); @@ -538,7 +544,15 @@ int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, const struct lu_fid *fid); /* osd_xattr.c */ -int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl, nvlist_t **sa); +int __osd_sa_xattr_schedule_update(const struct lu_env *env, + struct osd_object *obj, + struct osd_thandle *oh); +int __osd_sa_attr_init(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh); +int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh); +int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl, + nvlist_t **sa); int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd, uint64_t xattr, struct lu_buf *buf, const char *name, int *sizep); diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 2ed4a18..c68f8de 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -107,37 +107,45 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o) /* * Add object to list of dirty objects in tx handle. */ -static void -osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh) +void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh) { if (!list_empty(&obj->oo_sa_linkage)) return; - down(&oh->ot_sa_lock); write_lock(&obj->oo_attr_lock); if (likely(list_empty(&obj->oo_sa_linkage))) list_add(&obj->oo_sa_linkage, &oh->ot_sa_list); write_unlock(&obj->oo_attr_lock); - up(&oh->ot_sa_lock); } /* * Release spill block dbuf hold for all dirty SAs. */ -void osd_object_sa_dirty_rele(struct osd_thandle *oh) +void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh) { struct osd_object *obj; - down(&oh->ot_sa_lock); while (!list_empty(&oh->ot_sa_list)) { obj = list_entry(oh->ot_sa_list.next, struct osd_object, oo_sa_linkage); - sa_spill_rele(obj->oo_sa_hdl); write_lock(&obj->oo_attr_lock); list_del_init(&obj->oo_sa_linkage); write_unlock(&obj->oo_attr_lock); + if (obj->oo_late_xattr) { + /* + * take oo_guard to protect oo_sa_xattr buffer + * from concurrent update by osd_xattr_set() + */ + LASSERT(oh->ot_assigned != 0); + down_write(&obj->oo_guard); + if (obj->oo_late_attr_set) + __osd_sa_attr_init(env, obj, oh); + else if (obj->oo_late_xattr) + __osd_sa_xattr_update(env, obj, oh); + up_write(&obj->oo_guard); + } + sa_spill_rele(obj->oo_sa_hdl); } - up(&oh->ot_sa_lock); } /* @@ -1181,7 +1189,8 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt, int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, sa_handle_t *sa_hdl, dmu_tx_t *tx, - struct lu_attr *la, uint64_t parent) + struct lu_attr *la, uint64_t parent, + nvlist_t *xattr) { sa_bulk_attr_t *bulk = osd_oti_get(env)->oti_attr_bulk; struct osa_attr *osa = &osd_oti_get(env)->oti_osa; @@ -1190,6 +1199,9 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, timestruc_t now; int cnt; int rc; + char *dxattr = NULL; + size_t sa_size; + LASSERT(sa_hdl); @@ -1234,7 +1246,24 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd, SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8); LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk)); + if (xattr) { + rc = -nvlist_size(xattr, &sa_size, NV_ENCODE_XDR); + LASSERT(rc == 0); + + dxattr = osd_zio_buf_alloc(sa_size); + LASSERT(dxattr); + + rc = -nvlist_pack(xattr, &dxattr, &sa_size, + NV_ENCODE_XDR, KM_SLEEP); + LASSERT(rc == 0); + + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd), + NULL, dxattr, sa_size); + } + rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx); + if (dxattr) + osd_zio_buf_free(dxattr, sa_size); return rc; } @@ -1536,17 +1565,6 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt, if (rc) GOTO(out, rc); - /* configure new osd object */ - parent = parent != 0 ? parent : zapid; - rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx, - &obj->oo_attr, parent); - if (rc) - GOTO(out, rc); - - /* XXX: oo_lma_flags */ - obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT; - obj->oo_dt.do_body_ops = &osd_body_ops; - rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP); if (rc) GOTO(out, rc); @@ -1558,9 +1576,20 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt, (uchar_t *)lma, sizeof(*lma)); if (rc) GOTO(out, rc); - rc = __osd_sa_xattr_update(env, obj, oh); + + /* configure new osd object */ + obj->oo_parent = parent != 0 ? parent : zapid; + obj->oo_late_attr_set = 1; + rc = __osd_sa_xattr_schedule_update(env, obj, oh); if (rc) GOTO(out, rc); + + /* XXX: oo_lma_flags */ + obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT; + if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu)))) + /* no body operations for accounting objects */ + obj->oo_dt.do_body_ops = &osd_body_ops; + osd_idc_find_and_init(env, osd, obj); out: diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 92d5452..a9de9e3 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -177,7 +177,7 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o, la->la_valid = LA_MODE | LA_UID | LA_GID; la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO; la->la_uid = la->la_gid = 0; - rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent); + rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent, NULL); sa_handle_destroy(sa_hdl); if (rc) goto commit; diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index f643f8a..fac11ba 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -347,6 +347,112 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } +int __osd_sa_attr_init(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh) +{ + sa_bulk_attr_t *bulk = osd_oti_get(env)->oti_attr_bulk; + struct osa_attr *osa = &osd_oti_get(env)->oti_osa; + struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf; + struct osd_device *osd = osd_obj2dev(obj); + uint64_t crtime[2], gen; + timestruc_t now; + size_t size; + int rc, cnt; + + obj->oo_late_xattr = 0; + obj->oo_late_attr_set = 0; + + gen = dmu_tx_get_txg(oh->ot_tx); + gethrestime(&now); + ZFS_TIME_ENCODE(&now, crtime); + + osa->atime[0] = obj->oo_attr.la_atime; + osa->ctime[0] = obj->oo_attr.la_ctime; + osa->mtime[0] = obj->oo_attr.la_mtime; + osa->mode = obj->oo_attr.la_mode; + osa->uid = obj->oo_attr.la_uid; + osa->gid = obj->oo_attr.la_gid; + osa->rdev = obj->oo_attr.la_rdev; + osa->nlink = obj->oo_attr.la_nlink; + osa->flags = attrs_fs2zfs(obj->oo_attr.la_flags); + osa->size = obj->oo_attr.la_size; + + cnt = 0; + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8); + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL, + &obj->oo_parent, 8); + LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk)); + + /* Update the SA for additions, modifications, and removals. */ + rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR); + if (rc) + return rc; + + lu_buf_check_and_alloc(lb, size); + if (lb->lb_buf == NULL) { + CERROR("%s: can't allocate buffer for xattr update\n", + osd->od_svname); + return -ENOMEM; + } + + rc = -nvlist_pack(obj->oo_sa_xattr, (char **)&lb->lb_buf, &size, + NV_ENCODE_XDR, KM_SLEEP); + if (rc) + return rc; + + SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd), NULL, lb->lb_buf, size); + + rc = -sa_replace_all_by_template(obj->oo_sa_hdl, bulk, cnt, oh->ot_tx); + + return rc; +} + +int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh) +{ + struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf; + struct osd_device *osd = osd_obj2dev(obj); + char *dxattr; + size_t size; + int rc; + + obj->oo_late_xattr = 0; + + /* Update the SA for additions, modifications, and removals. */ + rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR); + if (rc) + return rc; + + lu_buf_check_and_alloc(lb, size); + if (lb->lb_buf == NULL) { + CERROR("%s: can't allocate buffer for xattr update\n", + osd->od_svname); + return -ENOMEM; + } + + dxattr = lb->lb_buf; + rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size, + NV_ENCODE_XDR, KM_SLEEP); + if (rc) + return rc; + LASSERT(dxattr == lb->lb_buf); + + sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx); + + return 0; +} + /* * Set an extended attribute. * This transaction must have called udmu_xattr_declare_set() first. @@ -355,36 +461,20 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, * * No locking is done here. */ -int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, - struct osd_thandle *oh) +int __osd_sa_xattr_schedule_update(const struct lu_env *env, + struct osd_object *obj, + struct osd_thandle *oh) { - struct osd_device *osd = osd_obj2dev(obj); - char *dxattr; - size_t sa_size; - int rc; - ENTRY; LASSERT(obj->oo_sa_hdl); LASSERT(obj->oo_sa_xattr); - /* Update the SA for additions, modifications, and removals. */ - rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR); - if (rc) - return rc; - - dxattr = osd_zio_buf_alloc(sa_size); - if (dxattr == NULL) - RETURN(-ENOMEM); + /* schedule batched SA update in osd_object_sa_dirty_rele() */ + obj->oo_late_xattr = 1; + osd_object_sa_dirty_add(obj, oh); - rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size, - NV_ENCODE_XDR, KM_SLEEP); - if (rc) - GOTO(out_free, rc); + RETURN(0); - rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh); -out_free: - osd_zio_buf_free(dxattr, sa_size); - RETURN(rc); } int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, @@ -428,7 +518,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, DATA_TYPE_BYTE_ARRAY); if (rc < 0) return rc; - rc = __osd_sa_xattr_update(env, obj, oh); + rc = __osd_sa_xattr_schedule_update(env, obj, oh); return rc == 0 ? -EFBIG : rc; } } else if (rc == -ENOENT) { @@ -459,7 +549,13 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, if (rc) return rc; - rc = __osd_sa_xattr_update(env, obj, oh); + /* batch updates only for just created dnodes where we + * used to set number of EAs in a single transaction */ + if (obj->oo_dn->dn_allocated_txg == oh->ot_tx->tx_txg) + rc = __osd_sa_xattr_schedule_update(env, obj, oh); + else + rc = __osd_sa_xattr_update(env, obj, oh); + return rc; } @@ -671,7 +767,7 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj, rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY); if (rc == 0) - rc = __osd_sa_xattr_update(env, obj, oh); + rc = __osd_sa_xattr_schedule_update(env, obj, oh); return rc; } -- 1.8.3.1