From 6cd79ab5860c59c2a640a9e8ca4ee86eec050b43 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Fri, 25 Mar 2016 12:21:16 +0300 Subject: [PATCH] LU-7899 osd: batch EA updates during file creation we set number of EAs: LMA, VBR, LinkEA, LOVEA, ACLs. calling into SA to refill spill again and again is expensive. thus it makes sense to postpone this to osd_trans_stop() where all changed EAs has been already collected in a temporary buffer. Change-Id: I8f02a287b96615c3aa550d63ffd9dd3da51b39ee Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/19143 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Bobi Jam Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_handler.c | 9 ++--- lustre/osd-zfs/osd_internal.h | 10 ++++-- lustre/osd-zfs/osd_object.c | 28 ++++++++++----- lustre/osd-zfs/osd_xattr.c | 80 ++++++++++++++++++++++++++++--------------- 4 files changed, 83 insertions(+), 44 deletions(-) diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 5d1ae30..f7c831f 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -288,7 +288,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, if (oh->ot_assigned == 0) { LASSERT(oh->ot_tx); dmu_tx_abort(oh->ot_tx); - osd_object_sa_dirty_rele(oh); + osd_object_sa_dirty_rele(env, oh); osd_unlinked_list_emptify(osd, &unlinked, false); /* there won't be any commit, release reserved quota space now, * if any */ @@ -317,7 +317,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, LASSERT(oh->ot_tx); txg = oh->ot_tx->tx_txg; - osd_object_sa_dirty_rele(oh); + osd_object_sa_dirty_rele(env, oh); /* XXX: Once dmu_tx_commit() called, oh/th could have been freed * by osd_trans_commit_cb already. */ dmu_tx_commit(oh->ot_tx); @@ -355,7 +355,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env, INIT_LIST_HEAD(&oh->ot_stop_dcb_list); INIT_LIST_HEAD(&oh->ot_unlinked_list); INIT_LIST_HEAD(&oh->ot_sa_list); - sema_init(&oh->ot_sa_lock, 1); memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans)); th = &oh->ot_super; th->th_dev = dt; @@ -680,15 +679,13 @@ static void osd_key_fini(const struct lu_context *ctx, { struct osd_thread_info *info = data; + lu_buf_free(&info->oti_xattr_lbuf); OBD_FREE_PTR(info); } static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) { - struct osd_thread_info *info = data; - - memset(info, 0, sizeof(*info)); } struct lu_context_key osd_key = { diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index e340e6f..56203e6 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -196,6 +196,7 @@ struct osd_thread_info { struct lquota_id_info oti_qi; struct lu_seq_range oti_seq_range; + struct lu_buf oti_xattr_lbuf; }; extern struct lu_context_key osd_key; @@ -211,7 +212,6 @@ struct osd_thandle { struct list_head ot_stop_dcb_list; struct list_head ot_unlinked_list; struct list_head ot_sa_list; - struct semaphore ot_sa_lock; dmu_tx_t *ot_tx; struct lquota_trans ot_quota_trans; __u32 ot_write_commit:1, @@ -343,7 +343,8 @@ struct osd_object { uint64_t oo_xattr; enum osd_destroy_type oo_destroy; - __u32 oo_destroyed:1; + __u32 oo_destroyed:1, + oo_late_xattr:1; /* the i_flags in LMA */ __u32 oo_lma_flags; @@ -455,7 +456,8 @@ int osd_procfs_fini(struct osd_device *osd); /* osd_object.c */ extern char *osd_obj_tag; -void osd_object_sa_dirty_rele(struct osd_thandle *oh); +void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh); +void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh); int __osd_obj2dbuf(const struct lu_env *env, objset_t *os, uint64_t oid, dmu_buf_t **dbp); struct lu_object *osd_object_alloc(const struct lu_env *env, @@ -493,6 +495,8 @@ void osd_zap_cursor_fini(zap_cursor_t *zc); uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc); /* osd_xattr.c */ +int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh); int __osd_xattr_load(struct osd_device *osd, uint64_t dnode, nvlist_t **sa_xattr); int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd, diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index a1d3ab7..912e91d 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -112,37 +112,49 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o) /* * Add object to list of dirty objects in tx handle. */ -static void -osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh) +void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh) { if (!list_empty(&obj->oo_sa_linkage)) return; - down(&oh->ot_sa_lock); write_lock(&obj->oo_attr_lock); if (likely(list_empty(&obj->oo_sa_linkage))) list_add(&obj->oo_sa_linkage, &oh->ot_sa_list); write_unlock(&obj->oo_attr_lock); - up(&oh->ot_sa_lock); } /* * Release spill block dbuf hold for all dirty SAs. */ -void osd_object_sa_dirty_rele(struct osd_thandle *oh) +void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh) { struct osd_object *obj; - down(&oh->ot_sa_lock); while (!list_empty(&oh->ot_sa_list)) { obj = list_entry(oh->ot_sa_list.next, struct osd_object, oo_sa_linkage); - sa_spill_rele(obj->oo_sa_hdl); write_lock(&obj->oo_attr_lock); list_del_init(&obj->oo_sa_linkage); + if (obj->oo_late_xattr) { + LASSERT(oh->ot_assigned != 0); + /* we need oo_guard to protect oo_sa_xattr + * consistency, but if we just take it, then + * we break lock ordering. in majority of + * cases the callers don't try to set EAs + * very often in concurrent manner, so hopefully + * we don't need to retake locks too often */ + if (down_read_trylock(&obj->oo_guard)) { + __osd_sa_xattr_update(env, obj, oh); + up_read(&obj->oo_guard); + } else { + down_read(&obj->oo_guard); + __osd_sa_xattr_update(env, obj, oh); + up_read(&obj->oo_guard); + } + } + sa_spill_rele(obj->oo_sa_hdl); write_unlock(&obj->oo_attr_lock); } - up(&oh->ot_sa_lock); } /* diff --git a/lustre/osd-zfs/osd_xattr.c b/lustre/osd-zfs/osd_xattr.c index 1be8363..9fd2437 100644 --- a/lustre/osd-zfs/osd_xattr.c +++ b/lustre/osd-zfs/osd_xattr.c @@ -353,6 +353,41 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, RETURN(0); } +int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, + struct osd_thandle *oh) +{ + struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf; + struct osd_device *osd = osd_obj2dev(obj); + char *dxattr; + size_t size; + int rc; + + obj->oo_late_xattr = 0; + + /* Update the SA for additions, modifications, and removals. */ + rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR); + if (rc) + return rc; + + lu_buf_check_and_alloc(lb, size); + if (lb->lb_buf == NULL) { + CERROR("%s: can't allocate buffer for xattr update\n", + osd->od_svname); + return -ENOMEM; + } + + dxattr = lb->lb_buf; + rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size, + NV_ENCODE_XDR, KM_SLEEP); + if (rc) + return rc; + LASSERT(dxattr == lb->lb_buf); + + sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx); + + return 0; +} + /* * Set an extended attribute. * This transaction must have called udmu_xattr_declare_set() first. @@ -361,43 +396,27 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, * * No locking is done here. */ -static int -__osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj, - struct osd_thandle *oh) +int __osd_sa_xattr_schedule_update(const struct lu_env *env, + struct osd_object *obj, + struct osd_thandle *oh) { - struct osd_device *osd = osd_obj2dev(obj); - char *dxattr; - size_t sa_size; - int rc; - ENTRY; LASSERT(obj->oo_sa_hdl); LASSERT(obj->oo_sa_xattr); - /* Update the SA for additions, modifications, and removals. */ - rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR); - if (rc) - return rc; - - dxattr = osd_zio_buf_alloc(sa_size); - if (dxattr == NULL) - RETURN(-ENOMEM); + /* schedule batched SA update in osd_object_sa_dirty_rele() */ + obj->oo_late_xattr = 1; + osd_object_sa_dirty_add(obj, oh); - rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size, - NV_ENCODE_XDR, KM_SLEEP); - if (rc) - GOTO(out_free, rc); + RETURN(0); - rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh); -out_free: - osd_zio_buf_free(dxattr, sa_size); - RETURN(rc); } int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, const struct lu_buf *buf, const char *name, int fl, struct osd_thandle *oh) { + dmu_buf_impl_t *db; uchar_t *nv_value; size_t size; int nv_size; @@ -438,7 +457,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, DATA_TYPE_BYTE_ARRAY); if (rc < 0) return rc; - rc = __osd_sa_xattr_update(env, obj, oh); + rc = __osd_sa_xattr_schedule_update(env, obj, oh); return rc == 0 ? -EFBIG : rc; } } else if (rc == -ENOENT) { @@ -469,7 +488,14 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj, if (rc) return rc; - rc = __osd_sa_xattr_update(env, obj, oh); + /* batch updates only for just created dnodes where we + * used to set number of EAs in a single transaction */ + db = (dmu_buf_impl_t *)obj->oo_db; + if (DB_DNODE(db)->dn_allocated_txg == oh->ot_tx->tx_txg) + rc = __osd_sa_xattr_schedule_update(env, obj, oh); + else + rc = __osd_sa_xattr_update(env, obj, oh); + return rc; } @@ -690,7 +716,7 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj, rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY); if (rc == 0) - rc = __osd_sa_xattr_update(env, obj, oh); + rc = __osd_sa_xattr_schedule_update(env, obj, oh); return rc; } -- 1.8.3.1