Whamcloud - gitweb
Revert "LU-7899 osd: batch EA updates" 78/21878/2
authorOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Aug 2016 07:13:29 +0000 (07:13 +0000)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Aug 2016 17:57:37 +0000 (17:57 +0000)
Reverting this patch as it seems to be causing OOM issus
documented in LU-8449 and there does not seem to be an
easy fix in sight.

This reverts commit 6cd79ab5860c59c2a640a9e8ca4ee86eec050b43.

Change-Id: I934af93d893b01dad7190471b6b1a7bdffb1b509
Reviewed-on: http://review.whamcloud.com/21878
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Oleg Drokin <oleg.drokin@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_xattr.c

index f7c831f..5d1ae30 100644 (file)
@@ -288,7 +288,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
-               osd_object_sa_dirty_rele(env, oh);
+               osd_object_sa_dirty_rele(oh);
                osd_unlinked_list_emptify(osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
                osd_unlinked_list_emptify(osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
@@ -317,7 +317,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        LASSERT(oh->ot_tx);
        txg = oh->ot_tx->tx_txg;
 
        LASSERT(oh->ot_tx);
        txg = oh->ot_tx->tx_txg;
 
-       osd_object_sa_dirty_rele(env, oh);
+       osd_object_sa_dirty_rele(oh);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
@@ -355,6 +355,7 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
+       sema_init(&oh->ot_sa_lock, 1);
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
@@ -679,13 +680,15 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
        struct osd_thread_info *info = data;
 
 {
        struct osd_thread_info *info = data;
 
-       lu_buf_free(&info->oti_xattr_lbuf);
        OBD_FREE_PTR(info);
 }
 
 static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
        OBD_FREE_PTR(info);
 }
 
 static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
+       struct osd_thread_info *info = data;
+
+       memset(info, 0, sizeof(*info));
 }
 
 struct lu_context_key osd_key = {
 }
 
 struct lu_context_key osd_key = {
index c39a461..20b1171 100644 (file)
@@ -196,7 +196,6 @@ struct osd_thread_info {
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
-       struct lu_buf            oti_xattr_lbuf;
 };
 
 extern struct lu_context_key osd_key;
 };
 
 extern struct lu_context_key osd_key;
@@ -212,6 +211,7 @@ struct osd_thandle {
        struct list_head         ot_stop_dcb_list;
        struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
        struct list_head         ot_stop_dcb_list;
        struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
+       struct semaphore         ot_sa_lock;
        dmu_tx_t                *ot_tx;
        struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
        dmu_tx_t                *ot_tx;
        struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
@@ -343,8 +343,7 @@ struct osd_object {
        uint64_t                 oo_xattr;
        enum osd_destroy_type    oo_destroy;
 
        uint64_t                 oo_xattr;
        enum osd_destroy_type    oo_destroy;
 
-       __u32                    oo_destroyed:1,
-                                oo_late_xattr:1;
+       __u32                    oo_destroyed:1;
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
@@ -456,8 +455,7 @@ int osd_procfs_fini(struct osd_device *osd);
 
 /* osd_object.c */
 extern char *osd_obj_tag;
 
 /* osd_object.c */
 extern char *osd_obj_tag;
-void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh);
-void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh);
+void osd_object_sa_dirty_rele(struct osd_thandle *oh);
 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
                   uint64_t oid, dmu_buf_t **dbp);
 struct lu_object *osd_object_alloc(const struct lu_env *env,
 int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
                   uint64_t oid, dmu_buf_t **dbp);
 struct lu_object *osd_object_alloc(const struct lu_env *env,
@@ -495,8 +493,6 @@ void osd_zap_cursor_fini(zap_cursor_t *zc);
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
 
 /* osd_xattr.c */
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
 
 /* osd_xattr.c */
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
-                         struct osd_thandle *oh);
 int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
                     nvlist_t **sa_xattr);
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
 int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
                     nvlist_t **sa_xattr);
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
index 912e91d..a1d3ab7 100644 (file)
@@ -112,49 +112,37 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
 /*
  * Add object to list of dirty objects in tx handle.
  */
 /*
  * Add object to list of dirty objects in tx handle.
  */
-void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
+static void
+osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
 {
        if (!list_empty(&obj->oo_sa_linkage))
                return;
 
 {
        if (!list_empty(&obj->oo_sa_linkage))
                return;
 
+       down(&oh->ot_sa_lock);
        write_lock(&obj->oo_attr_lock);
        if (likely(list_empty(&obj->oo_sa_linkage)))
                list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
        write_unlock(&obj->oo_attr_lock);
        write_lock(&obj->oo_attr_lock);
        if (likely(list_empty(&obj->oo_sa_linkage)))
                list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
        write_unlock(&obj->oo_attr_lock);
+       up(&oh->ot_sa_lock);
 }
 
 /*
  * Release spill block dbuf hold for all dirty SAs.
  */
 }
 
 /*
  * Release spill block dbuf hold for all dirty SAs.
  */
-void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
+void osd_object_sa_dirty_rele(struct osd_thandle *oh)
 {
        struct osd_object *obj;
 
 {
        struct osd_object *obj;
 
+       down(&oh->ot_sa_lock);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
                                 struct osd_object, oo_sa_linkage);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
                                 struct osd_object, oo_sa_linkage);
+               sa_spill_rele(obj->oo_sa_hdl);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
-               if (obj->oo_late_xattr) {
-                       LASSERT(oh->ot_assigned != 0);
-                       /* we need oo_guard to protect oo_sa_xattr
-                        * consistency, but if we just take it, then
-                        * we break lock ordering. in majority of
-                        * cases the callers don't try to set EAs
-                        * very often in concurrent manner, so hopefully
-                        * we don't need to retake locks too often */
-                       if (down_read_trylock(&obj->oo_guard)) {
-                               __osd_sa_xattr_update(env, obj, oh);
-                               up_read(&obj->oo_guard);
-                       } else {
-                               down_read(&obj->oo_guard);
-                               __osd_sa_xattr_update(env, obj, oh);
-                               up_read(&obj->oo_guard);
-                       }
-               }
-               sa_spill_rele(obj->oo_sa_hdl);
                write_unlock(&obj->oo_attr_lock);
        }
                write_unlock(&obj->oo_attr_lock);
        }
+       up(&oh->ot_sa_lock);
 }
 
 /*
 }
 
 /*
index 7e5d932..218464f 100644 (file)
@@ -353,41 +353,6 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
-                          struct osd_thandle *oh)
-{
-       struct lu_buf     *lb = &osd_oti_get(env)->oti_xattr_lbuf;
-       struct osd_device *osd = osd_obj2dev(obj);
-       char              *dxattr;
-       size_t             size;
-       int                rc;
-
-       obj->oo_late_xattr = 0;
-
-       /* Update the SA for additions, modifications, and removals. */
-       rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
-       if (rc)
-               return rc;
-
-       lu_buf_check_and_alloc(lb, size);
-       if (lb->lb_buf == NULL) {
-               CERROR("%s: can't allocate buffer for xattr update\n",
-                               osd->od_svname);
-               return -ENOMEM;
-       }
-
-       dxattr = lb->lb_buf;
-       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size,
-                       NV_ENCODE_XDR, KM_SLEEP);
-       if (rc)
-               return rc;
-       LASSERT(dxattr == lb->lb_buf);
-
-       sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx);
-
-       return 0;
-}
-
 /*
  * Set an extended attribute.
  * This transaction must have called udmu_xattr_declare_set() first.
 /*
  * Set an extended attribute.
  * This transaction must have called udmu_xattr_declare_set() first.
@@ -396,27 +361,43 @@ int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
  *
  * No locking is done here.
  */
  *
  * No locking is done here.
  */
-int __osd_sa_xattr_schedule_update(const struct lu_env *env,
-                                  struct osd_object *obj,
-                                  struct osd_thandle *oh)
+static int
+__osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                     struct osd_thandle *oh)
 {
 {
+       struct osd_device *osd = osd_obj2dev(obj);
+       char              *dxattr;
+       size_t             sa_size;
+       int                rc;
+
        ENTRY;
        LASSERT(obj->oo_sa_hdl);
        LASSERT(obj->oo_sa_xattr);
 
        ENTRY;
        LASSERT(obj->oo_sa_hdl);
        LASSERT(obj->oo_sa_xattr);
 
-       /* schedule batched SA update in osd_object_sa_dirty_rele() */
-       obj->oo_late_xattr = 1;
-       osd_object_sa_dirty_add(obj, oh);
+       /* Update the SA for additions, modifications, and removals. */
+       rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
+       if (rc)
+               return rc;
 
 
-       RETURN(0);
+       dxattr = osd_zio_buf_alloc(sa_size);
+       if (dxattr == NULL)
+               RETURN(-ENOMEM);
+
+       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
+                               NV_ENCODE_XDR, KM_SLEEP);
+       if (rc)
+               GOTO(out_free, rc);
 
 
+       rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh);
+out_free:
+       osd_zio_buf_free(dxattr, sa_size);
+       RETURN(rc);
 }
 
 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
                       struct osd_thandle *oh)
 {
 }
 
 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                       const struct lu_buf *buf, const char *name, int fl,
                       struct osd_thandle *oh)
 {
-       dmu_buf_impl_t *db;
        uchar_t *nv_value;
        size_t  size;
        int     nv_size;
        uchar_t *nv_value;
        size_t  size;
        int     nv_size;
@@ -457,7 +438,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                                                DATA_TYPE_BYTE_ARRAY);
                        if (rc < 0)
                                return rc;
                                                DATA_TYPE_BYTE_ARRAY);
                        if (rc < 0)
                                return rc;
-                       rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+                       rc = __osd_sa_xattr_update(env, obj, oh);
                        return rc == 0 ? -EFBIG : rc;
                }
        } else if (rc == -ENOENT) {
                        return rc == 0 ? -EFBIG : rc;
                }
        } else if (rc == -ENOENT) {
@@ -488,14 +469,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
        if (rc)
                return rc;
 
        if (rc)
                return rc;
 
-       /* batch updates only for just created dnodes where we
-        * used to set number of EAs in a single transaction */
-       db = (dmu_buf_impl_t *)obj->oo_db;
-       if (DB_DNODE(db)->dn_allocated_txg == oh->ot_tx->tx_txg)
-               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
-       else
-               rc = __osd_sa_xattr_update(env, obj, oh);
-
+       rc = __osd_sa_xattr_update(env, obj, oh);
        return rc;
 }
 
        return rc;
 }
 
@@ -716,7 +690,7 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
        if (rc == 0)
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
        if (rc == 0)
-               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+               rc = __osd_sa_xattr_update(env, obj, oh);
        return rc;
 }
 
        return rc;
 }