Whamcloud - gitweb
LU-7899 osd: batch EA updates 43/19143/5
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Fri, 25 Mar 2016 09:21:16 +0000 (12:21 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 11 Jul 2016 23:56:08 +0000 (23:56 +0000)
during file creation we set number of EAs: LMA, VBR, LinkEA, LOVEA, ACLs.
calling into SA to refill spill again and again is expensive. thus it
makes sense to postpone this to osd_trans_stop() where all changed EAs
has been already collected in a temporary buffer.

Change-Id: I8f02a287b96615c3aa550d63ffd9dd3da51b39ee
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/19143
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_xattr.c

index 5d1ae30..f7c831f 100644 (file)
@@ -288,7 +288,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
-               osd_object_sa_dirty_rele(oh);
+               osd_object_sa_dirty_rele(env, oh);
                osd_unlinked_list_emptify(osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                osd_unlinked_list_emptify(osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
@@ -317,7 +317,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        LASSERT(oh->ot_tx);
        txg = oh->ot_tx->tx_txg;
 
        LASSERT(oh->ot_tx);
        txg = oh->ot_tx->tx_txg;
 
-       osd_object_sa_dirty_rele(oh);
+       osd_object_sa_dirty_rele(env, oh);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
@@ -355,7 +355,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
-       sema_init(&oh->ot_sa_lock, 1);
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
@@ -680,15 +679,13 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
        struct osd_thread_info *info = data;
 
 {
        struct osd_thread_info *info = data;
 
+       lu_buf_free(&info->oti_xattr_lbuf);
        OBD_FREE_PTR(info);
 }
 
 static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
        OBD_FREE_PTR(info);
 }
 
 static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
-       struct osd_thread_info *info = data;
-
-       memset(info, 0, sizeof(*info));
 }
 
 struct lu_context_key osd_key = {
 }
 
 struct lu_context_key osd_key = {
index e340e6f..56203e6 100644 (file)
@@ -196,6 +196,7 @@ struct osd_thread_info {
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
+       struct lu_buf            oti_xattr_lbuf;
 };
 
 extern struct lu_context_key osd_key;
 };
 
 extern struct lu_context_key osd_key;
@@ -211,7 +212,6 @@ struct osd_thandle {
        struct list_head         ot_stop_dcb_list;
        struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
        struct list_head         ot_stop_dcb_list;
        struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
-       struct semaphore         ot_sa_lock;
        dmu_tx_t                *ot_tx;
        struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
        dmu_tx_t                *ot_tx;
        struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
@@ -343,7 +343,8 @@ struct osd_object {
        uint64_t                 oo_xattr;
        enum osd_destroy_type    oo_destroy;
 
        uint64_t                 oo_xattr;
        enum osd_destroy_type    oo_destroy;
 
-       __u32                    oo_destroyed:1;
+       __u32                    oo_destroyed:1,
+                                oo_late_xattr:1;
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
@@ -455,7 +456,8 @@ int osd_procfs_fini(struct osd_device *osd);
 
 /* osd_object.c */
 extern char *osd_obj_tag;
 
 /* osd_object.c */
 extern char *osd_obj_tag;
-void osd_object_sa_dirty_rele(struct osd_thandle *oh);
+void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh);
+void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh);
 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
                   uint64_t oid, dmu_buf_t **dbp);
 struct lu_object *osd_object_alloc(const struct lu_env *env,
 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
                   uint64_t oid, dmu_buf_t **dbp);
 struct lu_object *osd_object_alloc(const struct lu_env *env,
@@ -493,6 +495,8 @@ void osd_zap_cursor_fini(zap_cursor_t *zc);
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
 
 /* osd_xattr.c */
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
 
 /* osd_xattr.c */
+int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                         struct osd_thandle *oh);
 int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
                     nvlist_t **sa_xattr);
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
 int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
                     nvlist_t **sa_xattr);
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
index a1d3ab7..912e91d 100644 (file)
@@ -112,37 +112,49 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
 /*
  * Add object to list of dirty objects in tx handle.
  */
 /*
  * Add object to list of dirty objects in tx handle.
  */
-static void
-osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
+void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
 {
        if (!list_empty(&obj->oo_sa_linkage))
                return;
 
 {
        if (!list_empty(&obj->oo_sa_linkage))
                return;
 
-       down(&oh->ot_sa_lock);
        write_lock(&obj->oo_attr_lock);
        if (likely(list_empty(&obj->oo_sa_linkage)))
                list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
        write_unlock(&obj->oo_attr_lock);
        write_lock(&obj->oo_attr_lock);
        if (likely(list_empty(&obj->oo_sa_linkage)))
                list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
        write_unlock(&obj->oo_attr_lock);
-       up(&oh->ot_sa_lock);
 }
 
 /*
  * Release spill block dbuf hold for all dirty SAs.
  */
 }
 
 /*
  * Release spill block dbuf hold for all dirty SAs.
  */
-void osd_object_sa_dirty_rele(struct osd_thandle *oh)
+void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
 {
        struct osd_object *obj;
 
 {
        struct osd_object *obj;
 
-       down(&oh->ot_sa_lock);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
                                 struct osd_object, oo_sa_linkage);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
                                 struct osd_object, oo_sa_linkage);
-               sa_spill_rele(obj->oo_sa_hdl);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
+               if (obj->oo_late_xattr) {
+                       LASSERT(oh->ot_assigned != 0);
+                       /* we need oo_guard to protect oo_sa_xattr
+                        * consistency, but if we just take it, then
+                        * we break lock ordering. in majority of
+                        * cases the callers don't try to set EAs
+                        * very often in concurrent manner, so hopefully
+                        * we don't need to retake locks too often */
+                       if (down_read_trylock(&obj->oo_guard)) {
+                               __osd_sa_xattr_update(env, obj, oh);
+                               up_read(&obj->oo_guard);
+                       } else {
+                               down_read(&obj->oo_guard);
+                               __osd_sa_xattr_update(env, obj, oh);
+                               up_read(&obj->oo_guard);
+                       }
+               }
+               sa_spill_rele(obj->oo_sa_hdl);
                write_unlock(&obj->oo_attr_lock);
        }
                write_unlock(&obj->oo_attr_lock);
        }
-       up(&oh->ot_sa_lock);
 }
 
 /*
 }
 
 /*
index 1be8363..9fd2437 100644 (file)
@@ -353,6 +353,41 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
+int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                          struct osd_thandle *oh)
+{
+       struct lu_buf     *lb = &osd_oti_get(env)->oti_xattr_lbuf;
+       struct osd_device *osd = osd_obj2dev(obj);
+       char              *dxattr;
+       size_t             size;
+       int                rc;
+
+       obj->oo_late_xattr = 0;
+
+       /* Update the SA for additions, modifications, and removals. */
+       rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
+       if (rc)
+               return rc;
+
+       lu_buf_check_and_alloc(lb, size);
+       if (lb->lb_buf == NULL) {
+               CERROR("%s: can't allocate buffer for xattr update\n",
+                               osd->od_svname);
+               return -ENOMEM;
+       }
+
+       dxattr = lb->lb_buf;
+       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size,
+                       NV_ENCODE_XDR, KM_SLEEP);
+       if (rc)
+               return rc;
+       LASSERT(dxattr == lb->lb_buf);
+
+       sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx);
+
+       return 0;
+}
+
 /*
  * Set an extended attribute.
  * This transaction must have called udmu_xattr_declare_set() first.
 /*
  * Set an extended attribute.
  * This transaction must have called udmu_xattr_declare_set() first.
@@ -361,43 +396,27 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
  *
  * No locking is done here.
  */
  *
  * No locking is done here.
  */
-static int
-__osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
-                     struct osd_thandle *oh)
+int __osd_sa_xattr_schedule_update(const struct lu_env *env,
+                                  struct osd_object *obj,
+                                  struct osd_thandle *oh)
 {
 {
-       struct osd_device *osd = osd_obj2dev(obj);
-       char              *dxattr;
-       size_t             sa_size;
-       int                rc;
-
        ENTRY;
        LASSERT(obj->oo_sa_hdl);
        LASSERT(obj->oo_sa_xattr);
 
        ENTRY;
        LASSERT(obj->oo_sa_hdl);
        LASSERT(obj->oo_sa_xattr);
 
-       /* Update the SA for additions, modifications, and removals. */
-       rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
-       if (rc)
-               return rc;
-
-       dxattr = osd_zio_buf_alloc(sa_size);
-       if (dxattr == NULL)
-               RETURN(-ENOMEM);
+       /* schedule batched SA update in osd_object_sa_dirty_rele() */
+       obj->oo_late_xattr = 1;
+       osd_object_sa_dirty_add(obj, oh);
 
 
-       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
-                               NV_ENCODE_XDR, KM_SLEEP);
-       if (rc)
-               GOTO(out_free, rc);
+       RETURN(0);
 
 
-       rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh);
-out_free:
-       osd_zio_buf_free(dxattr, sa_size);
-       RETURN(rc);
 }
 
 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
                       struct osd_thandle *oh)
 {
 }
 
 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
                       struct osd_thandle *oh)
 {
+       dmu_buf_impl_t *db;
        uchar_t *nv_value;
        size_t  size;
        int     nv_size;
        uchar_t *nv_value;
        size_t  size;
        int     nv_size;
@@ -438,7 +457,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                                                DATA_TYPE_BYTE_ARRAY);
                        if (rc < 0)
                                return rc;
                                                DATA_TYPE_BYTE_ARRAY);
                        if (rc < 0)
                                return rc;
-                       rc = __osd_sa_xattr_update(env, obj, oh);
+                       rc = __osd_sa_xattr_schedule_update(env, obj, oh);
                        return rc == 0 ? -EFBIG : rc;
                }
        } else if (rc == -ENOENT) {
                        return rc == 0 ? -EFBIG : rc;
                }
        } else if (rc == -ENOENT) {
@@ -469,7 +488,14 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-       rc = __osd_sa_xattr_update(env, obj, oh);
+       /* batch updates only for just created dnodes where we
+        * used to set number of EAs in a single transaction */
+       db = (dmu_buf_impl_t *)obj->oo_db;
+       if (DB_DNODE(db)->dn_allocated_txg == oh->ot_tx->tx_txg)
+               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+       else
+               rc = __osd_sa_xattr_update(env, obj, oh);
+
        return rc;
 }
 
        return rc;
 }
 
@@ -690,7 +716,7 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
        if (rc == 0)
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
        if (rc == 0)
-               rc = __osd_sa_xattr_update(env, obj, oh);
+               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
        return rc;
 }
 
        return rc;
 }