Whamcloud - gitweb
LU-7899 osd: batch EA updates 82/28482/2
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Tue, 28 Feb 2017 09:44:14 +0000 (12:44 +0300)
committerJohn L. Hammond <john.hammond@intel.com>
Mon, 21 Aug 2017 20:25:30 +0000 (20:25 +0000)
during file creation we set number of EAs: LMA, VBR, LinkEA, LOVEA, ACLs.
calling into SA to refill spill again and again is expensive. thus it
makes sense to postpone this to osd_trans_stop() where all changed EAs
has been already collected in a temporary buffer.

Lustre-change: https://review.whamcloud.com/21893
Lustre-commit: 2c9ff6dffdf4320af95c9db9af07a416529275f0

Change-Id: Ia2604ddafdf8b2ca4f6db4d70ead6d2d2761cd26
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Signed-off-by: Minh Diep <minh.diep@intel.com>
Reviewed-on: https://review.whamcloud.com/28482
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osd-zfs/osd_xattr.c

index bcb8705..8d9fa0c 100644 (file)
@@ -292,7 +292,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
                dmu_tx_abort(oh->ot_tx);
-               osd_object_sa_dirty_rele(oh);
+               osd_object_sa_dirty_rele(env, oh);
                osd_unlinked_list_emptify(env, osd, &unlinked, false);
                /* there won't be any commit, release reserved quota space now,
                 * if any */
@@ -311,7 +311,7 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        LASSERT(oh->ot_tx);
        txg = oh->ot_tx->tx_txg;
 
-       osd_object_sa_dirty_rele(oh);
+       osd_object_sa_dirty_rele(env, oh);
        /* XXX: Once dmu_tx_commit() called, oh/th could have been freed
         * by osd_trans_commit_cb already. */
        dmu_tx_commit(oh->ot_tx);
@@ -357,7 +357,6 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
        INIT_LIST_HEAD(&oh->ot_unlinked_list);
        INIT_LIST_HEAD(&oh->ot_sa_list);
-       sema_init(&oh->ot_sa_lock, 1);
        memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
        th = &oh->ot_super;
        th->th_dev = dt;
@@ -716,6 +715,7 @@ static void osd_key_fini(const struct lu_context *ctx,
                info->oti_ins_cache = NULL;
                info->oti_ins_cache_size = 0;
        }
+       lu_buf_free(&info->oti_xattr_lbuf);
        OBD_FREE_PTR(info);
 }
 
index 89c3223..d7e6e99 100644 (file)
@@ -213,6 +213,7 @@ struct osd_thread_info {
        struct osd_idmap_cache *oti_ins_cache;
        int                    oti_ins_cache_size;
        int                    oti_ins_cache_used;
+       struct lu_buf          oti_xattr_lbuf;
 };
 
 extern struct lu_context_key osd_key;
@@ -228,7 +229,6 @@ struct osd_thandle {
        struct list_head         ot_stop_dcb_list;
        struct list_head         ot_unlinked_list;
        struct list_head         ot_sa_list;
-       struct semaphore         ot_sa_lock;
        dmu_tx_t                *ot_tx;
        struct lquota_trans      ot_quota_trans;
        __u32                    ot_write_commit:1,
@@ -358,7 +358,9 @@ struct osd_object {
        uint64_t                 oo_xattr;
        enum osd_destroy_type    oo_destroy;
 
-       __u32                    oo_destroyed:1;
+       __u32                    oo_destroyed:1,
+                                oo_late_xattr:1,
+                                oo_late_attr_set:1;
 
        /* the i_flags in LMA */
        __u32                    oo_lma_flags;
@@ -370,6 +372,7 @@ struct osd_object {
                        unsigned char            oo_recsize;
                        unsigned char            oo_recusize;   /* unit size */
                };
+               uint64_t        oo_parent; /* used only at object creation */
        };
 };
 
@@ -487,8 +490,11 @@ int osd_procfs_fini(struct osd_device *osd);
 
 /* osd_object.c */
 extern char *osd_obj_tag;
-void osd_object_sa_dirty_rele(struct osd_thandle *oh);
 int __osd_obj2dnode(objset_t *os, uint64_t oid, dnode_t **dnp);
+void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh);
+void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh);
+int __osd_obj2dbuf(const struct lu_env *env, objset_t *os,
+                  uint64_t oid, dmu_buf_t **dbp);
 struct lu_object *osd_object_alloc(const struct lu_env *env,
                                   const struct lu_object_header *hdr,
                                   struct lu_device *d);
@@ -501,7 +507,7 @@ int __osd_object_create(const struct lu_env *env, struct osd_object *obj,
                        dnode_t **dnp, dmu_tx_t *tx, struct lu_attr *la);
 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
                    sa_handle_t *sa_hdl, dmu_tx_t *tx,
-                   struct lu_attr *la, uint64_t parent);
+                   struct lu_attr *la, uint64_t parent, nvlist_t *);
 
 /* osd_oi.c */
 int osd_oi_init(const struct lu_env *env, struct osd_device *o);
@@ -538,7 +544,15 @@ int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
                   const struct lu_fid *fid);
 
 /* osd_xattr.c */
-int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl, nvlist_t **sa);
+int __osd_sa_xattr_schedule_update(const struct lu_env *env,
+                                  struct osd_object *obj,
+                                  struct osd_thandle *oh);
+int __osd_sa_attr_init(const struct lu_env *env, struct osd_object *obj,
+                      struct osd_thandle *oh);
+int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                         struct osd_thandle *oh);
+int __osd_xattr_load(struct osd_device *osd, sa_handle_t *hdl,
+                    nvlist_t **sa);
 int __osd_xattr_get_large(const struct lu_env *env, struct osd_device *osd,
                          uint64_t xattr, struct lu_buf *buf,
                          const char *name, int *sizep);
index 2ed4a18..c68f8de 100644 (file)
@@ -107,37 +107,45 @@ osd_object_sa_init(struct osd_object *obj, struct osd_device *o)
 /*
  * Add object to list of dirty objects in tx handle.
  */
-static void
-osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
+void osd_object_sa_dirty_add(struct osd_object *obj, struct osd_thandle *oh)
 {
        if (!list_empty(&obj->oo_sa_linkage))
                return;
 
-       down(&oh->ot_sa_lock);
        write_lock(&obj->oo_attr_lock);
        if (likely(list_empty(&obj->oo_sa_linkage)))
                list_add(&obj->oo_sa_linkage, &oh->ot_sa_list);
        write_unlock(&obj->oo_attr_lock);
-       up(&oh->ot_sa_lock);
 }
 
 /*
  * Release spill block dbuf hold for all dirty SAs.
  */
-void osd_object_sa_dirty_rele(struct osd_thandle *oh)
+void osd_object_sa_dirty_rele(const struct lu_env *env, struct osd_thandle *oh)
 {
        struct osd_object *obj;
 
-       down(&oh->ot_sa_lock);
        while (!list_empty(&oh->ot_sa_list)) {
                obj = list_entry(oh->ot_sa_list.next,
                                 struct osd_object, oo_sa_linkage);
-               sa_spill_rele(obj->oo_sa_hdl);
                write_lock(&obj->oo_attr_lock);
                list_del_init(&obj->oo_sa_linkage);
                write_unlock(&obj->oo_attr_lock);
+               if (obj->oo_late_xattr) {
+                       /*
+                        * take oo_guard to protect oo_sa_xattr buffer
+                        * from concurrent update by osd_xattr_set()
+                        */
+                       LASSERT(oh->ot_assigned != 0);
+                       down_write(&obj->oo_guard);
+                       if (obj->oo_late_attr_set)
+                               __osd_sa_attr_init(env, obj, oh);
+                       else if (obj->oo_late_xattr)
+                               __osd_sa_xattr_update(env, obj, oh);
+                       up_write(&obj->oo_guard);
+               }
+               sa_spill_rele(obj->oo_sa_hdl);
        }
-       up(&oh->ot_sa_lock);
 }
 
 /*
@@ -1181,7 +1189,8 @@ static int osd_declare_create(const struct lu_env *env, struct dt_object *dt,
 
 int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
                    sa_handle_t *sa_hdl, dmu_tx_t *tx,
-                   struct lu_attr *la, uint64_t parent)
+                   struct lu_attr *la, uint64_t parent,
+                   nvlist_t *xattr)
 {
        sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
        struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
@@ -1190,6 +1199,9 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        timestruc_t      now;
        int              cnt;
        int              rc;
+       char *dxattr = NULL;
+       size_t sa_size;
+
 
        LASSERT(sa_hdl);
 
@@ -1234,7 +1246,24 @@ int __osd_attr_init(const struct lu_env *env, struct osd_device *osd,
        SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
        LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
 
+       if (xattr) {
+               rc = -nvlist_size(xattr, &sa_size, NV_ENCODE_XDR);
+               LASSERT(rc == 0);
+
+               dxattr = osd_zio_buf_alloc(sa_size);
+               LASSERT(dxattr);
+
+               rc = -nvlist_pack(xattr, &dxattr, &sa_size,
+                               NV_ENCODE_XDR, KM_SLEEP);
+               LASSERT(rc == 0);
+
+               SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd),
+                               NULL, dxattr, sa_size);
+       }
+
        rc = -sa_replace_all_by_template(sa_hdl, bulk, cnt, tx);
+       if (dxattr)
+               osd_zio_buf_free(dxattr, sa_size);
 
        return rc;
 }
@@ -1536,17 +1565,6 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt,
        if (rc)
                GOTO(out, rc);
 
-       /* configure new osd object */
-       parent = parent != 0 ? parent : zapid;
-       rc = __osd_attr_init(env, osd, obj->oo_sa_hdl, oh->ot_tx,
-                            &obj->oo_attr, parent);
-       if (rc)
-               GOTO(out, rc);
-
-       /* XXX: oo_lma_flags */
-       obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
-       obj->oo_dt.do_body_ops = &osd_body_ops;
-
        rc = -nvlist_alloc(&obj->oo_sa_xattr, NV_UNIQUE_NAME, KM_SLEEP);
        if (rc)
                GOTO(out, rc);
@@ -1558,9 +1576,20 @@ static int osd_create(const struct lu_env *env, struct dt_object *dt,
                                    (uchar_t *)lma, sizeof(*lma));
        if (rc)
                GOTO(out, rc);
-       rc = __osd_sa_xattr_update(env, obj, oh);
+
+       /* configure new osd object */
+       obj->oo_parent = parent != 0 ? parent : zapid;
+       obj->oo_late_attr_set = 1;
+       rc = __osd_sa_xattr_schedule_update(env, obj, oh);
        if (rc)
                GOTO(out, rc);
+
+       /* XXX: oo_lma_flags */
+       obj->oo_dt.do_lu.lo_header->loh_attr |= obj->oo_attr.la_mode & S_IFMT;
+       if (likely(!fid_is_acct(lu_object_fid(&obj->oo_dt.do_lu))))
+               /* no body operations for accounting objects */
+               obj->oo_dt.do_body_ops = &osd_body_ops;
+
        osd_idc_find_and_init(env, osd, obj);
 
 out:
index 92d5452..a9de9e3 100644 (file)
@@ -177,7 +177,7 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
        la->la_valid = LA_MODE | LA_UID | LA_GID;
        la->la_mode = S_IFDIR | S_IRUGO | S_IWUSR | S_IXUGO;
        la->la_uid = la->la_gid = 0;
-       rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent);
+       rc = __osd_attr_init(env, o, sa_hdl, tx, la, parent, NULL);
        sa_handle_destroy(sa_hdl);
        if (rc)
                goto commit;
index f643f8a..fac11ba 100644 (file)
@@ -347,6 +347,112 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
+int __osd_sa_attr_init(const struct lu_env *env, struct osd_object *obj,
+                      struct osd_thandle *oh)
+{
+       sa_bulk_attr_t  *bulk = osd_oti_get(env)->oti_attr_bulk;
+       struct osa_attr *osa = &osd_oti_get(env)->oti_osa;
+       struct lu_buf *lb = &osd_oti_get(env)->oti_xattr_lbuf;
+       struct osd_device *osd = osd_obj2dev(obj);
+       uint64_t crtime[2], gen;
+       timestruc_t now;
+       size_t size;
+       int rc, cnt;
+
+       obj->oo_late_xattr = 0;
+       obj->oo_late_attr_set = 0;
+
+       gen = dmu_tx_get_txg(oh->ot_tx);
+       gethrestime(&now);
+       ZFS_TIME_ENCODE(&now, crtime);
+
+       osa->atime[0] = obj->oo_attr.la_atime;
+       osa->ctime[0] = obj->oo_attr.la_ctime;
+       osa->mtime[0] = obj->oo_attr.la_mtime;
+       osa->mode = obj->oo_attr.la_mode;
+       osa->uid = obj->oo_attr.la_uid;
+       osa->gid = obj->oo_attr.la_gid;
+       osa->rdev = obj->oo_attr.la_rdev;
+       osa->nlink = obj->oo_attr.la_nlink;
+       osa->flags = attrs_fs2zfs(obj->oo_attr.la_flags);
+       osa->size  = obj->oo_attr.la_size;
+
+       cnt = 0;
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MODE(osd), NULL, &osa->mode, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_SIZE(osd), NULL, &osa->size, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GEN(osd), NULL, &gen, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_UID(osd), NULL, &osa->uid, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_GID(osd), NULL, &osa->gid, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_FLAGS(osd), NULL, &osa->flags, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_ATIME(osd), NULL, osa->atime, 16);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_MTIME(osd), NULL, osa->mtime, 16);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CTIME(osd), NULL, osa->ctime, 16);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_CRTIME(osd), NULL, crtime, 16);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_LINKS(osd), NULL, &osa->nlink, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_RDEV(osd), NULL, &osa->rdev, 8);
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_PARENT(osd), NULL,
+                        &obj->oo_parent, 8);
+       LASSERT(cnt <= ARRAY_SIZE(osd_oti_get(env)->oti_attr_bulk));
+
+       /* Update the SA for additions, modifications, and removals. */
+       rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
+       if (rc)
+               return rc;
+
+       lu_buf_check_and_alloc(lb, size);
+       if (lb->lb_buf == NULL) {
+               CERROR("%s: can't allocate buffer for xattr update\n",
+                               osd->od_svname);
+               return -ENOMEM;
+       }
+
+       rc = -nvlist_pack(obj->oo_sa_xattr, (char **)&lb->lb_buf, &size,
+                         NV_ENCODE_XDR, KM_SLEEP);
+       if (rc)
+               return rc;
+
+       SA_ADD_BULK_ATTR(bulk, cnt, SA_ZPL_DXATTR(osd), NULL, lb->lb_buf, size);
+
+       rc = -sa_replace_all_by_template(obj->oo_sa_hdl, bulk, cnt, oh->ot_tx);
+
+       return rc;
+}
+
+int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
+                          struct osd_thandle *oh)
+{
+       struct lu_buf     *lb = &osd_oti_get(env)->oti_xattr_lbuf;
+       struct osd_device *osd = osd_obj2dev(obj);
+       char              *dxattr;
+       size_t             size;
+       int                rc;
+
+       obj->oo_late_xattr = 0;
+
+       /* Update the SA for additions, modifications, and removals. */
+       rc = -nvlist_size(obj->oo_sa_xattr, &size, NV_ENCODE_XDR);
+       if (rc)
+               return rc;
+
+       lu_buf_check_and_alloc(lb, size);
+       if (lb->lb_buf == NULL) {
+               CERROR("%s: can't allocate buffer for xattr update\n",
+                               osd->od_svname);
+               return -ENOMEM;
+       }
+
+       dxattr = lb->lb_buf;
+       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &size,
+                       NV_ENCODE_XDR, KM_SLEEP);
+       if (rc)
+               return rc;
+       LASSERT(dxattr == lb->lb_buf);
+
+       sa_update(obj->oo_sa_hdl, SA_ZPL_DXATTR(osd), dxattr, size, oh->ot_tx);
+
+       return 0;
+}
+
 /*
  * Set an extended attribute.
  * This transaction must have called udmu_xattr_declare_set() first.
@@ -355,36 +461,20 @@ int osd_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
  *
  * No locking is done here.
  */
-int __osd_sa_xattr_update(const struct lu_env *env, struct osd_object *obj,
-                     struct osd_thandle *oh)
+int __osd_sa_xattr_schedule_update(const struct lu_env *env,
+                                  struct osd_object *obj,
+                                  struct osd_thandle *oh)
 {
-       struct osd_device *osd = osd_obj2dev(obj);
-       char              *dxattr;
-       size_t             sa_size;
-       int                rc;
-
        ENTRY;
        LASSERT(obj->oo_sa_hdl);
        LASSERT(obj->oo_sa_xattr);
 
-       /* Update the SA for additions, modifications, and removals. */
-       rc = -nvlist_size(obj->oo_sa_xattr, &sa_size, NV_ENCODE_XDR);
-       if (rc)
-               return rc;
-
-       dxattr = osd_zio_buf_alloc(sa_size);
-       if (dxattr == NULL)
-               RETURN(-ENOMEM);
+       /* schedule batched SA update in osd_object_sa_dirty_rele() */
+       obj->oo_late_xattr = 1;
+       osd_object_sa_dirty_add(obj, oh);
 
-       rc = -nvlist_pack(obj->oo_sa_xattr, &dxattr, &sa_size,
-                               NV_ENCODE_XDR, KM_SLEEP);
-       if (rc)
-               GOTO(out_free, rc);
+       RETURN(0);
 
-       rc = osd_object_sa_update(obj, SA_ZPL_DXATTR(osd), dxattr, sa_size, oh);
-out_free:
-       osd_zio_buf_free(dxattr, sa_size);
-       RETURN(rc);
 }
 
 int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
@@ -428,7 +518,7 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
                                                DATA_TYPE_BYTE_ARRAY);
                        if (rc < 0)
                                return rc;
-                       rc = __osd_sa_xattr_update(env, obj, oh);
+                       rc = __osd_sa_xattr_schedule_update(env, obj, oh);
                        return rc == 0 ? -EFBIG : rc;
                }
        } else if (rc == -ENOENT) {
@@ -459,7 +549,13 @@ int __osd_sa_xattr_set(const struct lu_env *env, struct osd_object *obj,
        if (rc)
                return rc;
 
-       rc = __osd_sa_xattr_update(env, obj, oh);
+       /* batch updates only for just created dnodes where we
+        * used to set number of EAs in a single transaction */
+       if (obj->oo_dn->dn_allocated_txg == oh->ot_tx->tx_txg)
+               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
+       else
+               rc = __osd_sa_xattr_update(env, obj, oh);
+
        return rc;
 }
 
@@ -671,7 +767,7 @@ static int __osd_sa_xattr_del(const struct lu_env *env, struct osd_object *obj,
 
        rc = -nvlist_remove(obj->oo_sa_xattr, name, DATA_TYPE_BYTE_ARRAY);
        if (rc == 0)
-               rc = __osd_sa_xattr_update(env, obj, oh);
+               rc = __osd_sa_xattr_schedule_update(env, obj, oh);
        return rc;
 }