Whamcloud - gitweb
LU-2600 osd-zfs: batched object accounting 57/7157/18
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Sun, 28 Jul 2013 18:12:29 +0000 (22:12 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 22 Apr 2014 06:37:21 +0000 (06:37 +0000)
using dsl_sync_task_do_nowait() we can schedule the updates
to be executed in a closing txg. this way object accounting
can be done in memory and then updated in ZAPs once per txg.

Test-Parameters: fortestonly envdefinitions=SLOW=yes,ENABLE_QUOTA=yes \
mdtfilesystemtype=zfs mdsfilesystemtype=zfs ostfilesystemtype=zfs testlist=sanity-quota
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Change-Id: Iebe96c9847c6c53b8232295a44fd8044b1ae18b4
Reviewed-on: http://review.whamcloud.com/7157
Tested-by: Jenkins
Reviewed-by: Johann Lombardi <johann.lombardi@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Isaac Huang <he.huang@intel.com>
libcfs/libcfs/hash.c
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_quota.c

index 0c4faf8..bfed025 100644 (file)
@@ -1035,8 +1035,10 @@ cfs_hash_create(char *name, unsigned cur_bits, unsigned max_bits,
         LASSERT(ops->hs_hash);
         LASSERT(ops->hs_object);
         LASSERT(ops->hs_keycmp);
-        LASSERT(ops->hs_get != NULL);
-        LASSERT(ops->hs_put_locked != NULL);
+       if ((flags & CFS_HASH_NO_ITEMREF) == 0) {
+               LASSERT(ops->hs_get != NULL);
+               LASSERT(ops->hs_put_locked != NULL);
+       }
 
         if ((flags & CFS_HASH_REHASH) != 0)
                 flags |= CFS_HASH_COUNTER; /* must have counter */
index 47bbc76..0dc8c2e 100644 (file)
@@ -532,6 +532,10 @@ static int osd_mount(const struct lu_env *env,
        strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
                sizeof(o->od_svname) - 1);
 
+       rc = osd_zfs_acct_init(env, o);
+       if (rc)
+               RETURN(rc);
+
        if (server_name_is_ost(o->od_svname))
                o->od_is_ost = 1;
 
@@ -617,6 +621,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
                CERROR("%s: lost %d pinned dbuf(s)\n", o->od_svname,
                       atomic_read(&o->od_zerocopy_pin));
 
+       osd_zfs_acct_fini(env, o);
+
        if (o->od_objset.os != NULL)
                udmu_objset_close(&o->od_objset);
 
index bc51cb1..63f680c 100644 (file)
@@ -82,6 +82,8 @@ struct osd_it_quota {
        zap_cursor_t            *oiq_zc;
        /** identifier for current quota record */
        __u64                    oiq_id;
+       /* the hash where object accounting is cached */
+       cfs_hash_t              *oiq_hash;
        unsigned                 oiq_reset:1; /* 1 -- no need to advance */
 };
 
@@ -227,6 +229,16 @@ struct osd_seq_list {
 #define OSD_OST_MAP_SIZE       32
 
 /*
+ * this structure tracks changes made to the accounting within specific TXG
+ */
+struct osd_zfs_acct_txg {
+       uint64_t                 zat_txg;
+       cfs_hash_t              *zat_usr;
+       cfs_hash_t              *zat_grp;
+       struct osd_device       *zat_osd;
+};
+
+/*
  * osd device.
  */
 struct osd_device {
@@ -279,6 +291,13 @@ struct osd_device {
        atomic_t                 od_zerocopy_pin;
 
        arc_prune_t             *arc_prune_cb;
+
+       /* quota: object accounting */
+       spinlock_t               od_known_txg_lock;
+       uint64_t                 od_known_txg;
+       struct osd_zfs_acct_txg *od_acct_delta;
+       cfs_hash_t              *od_acct_usr;
+       cfs_hash_t              *od_acct_grp;
 };
 
 struct osd_object {
@@ -313,7 +332,6 @@ struct osd_object {
 
 int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *);
 extern const struct dt_index_operations osd_acct_index_ops;
-uint64_t osd_quota_fid2dmu(const struct lu_fid *fid);
 extern struct lu_device_operations  osd_lu_ops;
 int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
                      qid_t uid, qid_t gid, long long space,
@@ -496,6 +514,13 @@ osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj,
        return rc;
 }
 
+void osd_zfs_acct_uid(const struct lu_env *env, struct osd_device *osd,
+                    __u64 uid, int delta, struct osd_thandle *oh);
+void osd_zfs_acct_gid(const struct lu_env *env, struct osd_device *osd,
+                    __u64 gid, int delta, struct osd_thandle *oh);
+int osd_zfs_acct_init(const struct lu_env *env, struct osd_device *osd);
+void osd_zfs_acct_fini(const struct lu_env *env, struct osd_device *osd);
+
 static inline uint64_t attrs_fs2zfs(const uint32_t flags)
 {
        return (((flags & FS_APPEND_FL)         ? ZFS_APPENDONLY        : 0) |
index 89a91ea..0a4646b 100644 (file)
@@ -632,21 +632,9 @@ static int osd_object_destroy(const struct lu_env *env,
                GOTO(out, rc);
        }
 
-       /* Remove object from inode accounting. It is not fatal for the destroy
-        * operation if something goes wrong while updating accounting, but we
-        * still log an error message to notify the administrator */
-       rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
-                       obj->oo_attr.la_uid, -1, oh->ot_tx);
-       if (rc)
-               CERROR("%s: failed to remove "DFID" from accounting ZAP for usr"
-                       " %d: rc = %d\n", osd->od_svname, PFID(fid),
-                       obj->oo_attr.la_uid, rc);
-       rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
-                               obj->oo_attr.la_gid, -1, oh->ot_tx);
-       if (rc)
-               CERROR("%s: failed to remove "DFID" from accounting ZAP for grp"
-                       " %d: rc = %d\n", osd->od_svname, PFID(fid),
-                       obj->oo_attr.la_gid, rc);
+       /* do object accounting */
+       osd_zfs_acct_uid(env, osd, obj->oo_attr.la_uid, -1, oh);
+       osd_zfs_acct_gid(env, osd, obj->oo_attr.la_gid, -1, oh);
 
        /* kill object */
        rc = __osd_object_destroy(env, obj, oh->ot_tx, osd_obj_tag);
@@ -956,34 +944,14 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt,
 
        /* do both accounting updates outside oo_attr_lock below */
        if ((la->la_valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) {
-               /* Update user accounting. Failure isn't fatal, but we still
-                * log an error message */
-               rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
-                                       la->la_uid, 1, oh->ot_tx);
-               if (rc)
-                       CERROR("%s: failed to update accounting ZAP for user "
-                               "%d (%d)\n", osd->od_svname, la->la_uid, rc);
-               rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
-                                       obj->oo_attr.la_uid, -1, oh->ot_tx);
-               if (rc)
-                       CERROR("%s: failed to update accounting ZAP for user "
-                               "%d (%d)\n", osd->od_svname,
-                               obj->oo_attr.la_uid, rc);
+               /* do object accounting */
+               osd_zfs_acct_uid(env, osd, la->la_uid, 1, oh);
+               osd_zfs_acct_uid(env, osd, obj->oo_attr.la_uid, -1, oh);
        }
        if ((la->la_valid & LA_GID) && (la->la_gid != obj->oo_attr.la_gid)) {
-               /* Update group accounting. Failure isn't fatal, but we still
-                * log an error message */
-               rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
-                                       la->la_gid, 1, oh->ot_tx);
-               if (rc)
-                       CERROR("%s: failed to update accounting ZAP for user "
-                               "%d (%d)\n", osd->od_svname, la->la_gid, rc);
-               rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
-                                       obj->oo_attr.la_gid, -1, oh->ot_tx);
-               if (rc)
-                       CERROR("%s: failed to update accounting ZAP for user "
-                               "%d (%d)\n", osd->od_svname,
-                               obj->oo_attr.la_gid, rc);
+               /* do object accounting */
+               osd_zfs_acct_gid(env, osd, la->la_gid, 1, oh);
+               osd_zfs_acct_gid(env, osd, obj->oo_attr.la_gid, -1, oh);
        }
 
        write_lock(&obj->oo_attr_lock);
@@ -1505,18 +1473,9 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
 
        /* Add new object to inode accounting.
         * Errors are not considered as fatal */
-       rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid,
-                               (attr->la_valid & LA_UID) ? attr->la_uid : 0, 1,
-                               oh->ot_tx);
-       if (rc)
-               CERROR("%s: failed to add "DFID" to accounting ZAP for usr %d "
-                       "(%d)\n", osd->od_svname, PFID(fid), attr->la_uid, rc);
-       rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid,
-                               (attr->la_valid & LA_GID) ? attr->la_gid : 0, 1,
-                               oh->ot_tx);
-       if (rc)
-               CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d "
-                       "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc);
+       /* XXX: UID/GID must be defined otherwise we can break accounting */
+       osd_zfs_acct_uid(env, osd, attr->la_uid, 1, oh);
+       osd_zfs_acct_gid(env, osd, attr->la_gid, 1, oh);
 
        /* configure new osd object */
        obj->oo_db = db;
index b51be3b..a27505b 100644 (file)
 #include <obd.h>
 #include "osd_internal.h"
 
+#include <sys/dnode.h>
+#include <sys/spa.h>
+#include <sys/zap.h>
+#include <sys/dmu_tx.h>
+#include <sys/dsl_prop.h>
+#include <sys/txg.h>
+
+/*
+ * the structure tracks per-ID change/state
+ */
+struct zfs_id_change {
+       struct hlist_node       zic_hash;
+       __u64                   zic_id;
+       atomic_t                zic_num;
+};
+
+/*
+ * callback data for cfs_hash_for_each_safe()
+ * used in txg commit and OSD cleanup path
+ */
+struct hash_cbdata {
+       struct osd_device       *hcb_osd;
+       uint64_t                 hcb_zapid;
+       dmu_tx_t                *hcb_tx;
+};
+
 /**
  * Helper function to retrieve DMU object id from fid for accounting object
  */
-uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
+static inline uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
 {
        LASSERT(fid_is_acct(fid));
        if (fid_oid(fid) == ACCT_GROUP_OID)
@@ -42,6 +68,473 @@ uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
        return DMU_USERUSED_OBJECT;
 }
 
+/*
+ * a note about locking:
+ *  entries in per-OSD cache never go before umount,
+ *  so there is no need in locking for lookups.
+ *
+ *  entries in per-txg deltas never go before txg is closed,
+ *  there is no concurrency between removal/insertions.
+ *
+ * also, given all above, there is no need in reference counting.
+ */
+static struct zfs_id_change *osd_zfs_lookup_by_id(cfs_hash_t *hash, __u64 id)
+{
+       struct zfs_id_change    *za = NULL;
+       struct hlist_node       *hnode;
+       cfs_hash_bd_t            bd;
+
+       cfs_hash_bd_get(hash, &id, &bd);
+       hnode = cfs_hash_bd_peek_locked(hash, &bd, &id);
+       if (hnode != NULL)
+               za = container_of0(hnode, struct zfs_id_change, zic_hash);
+
+       return za;
+}
+
+static struct zfs_id_change *lookup_or_create_by_id(struct osd_device *osd,
+                                               cfs_hash_t *hash, __u64 id)
+{
+       struct zfs_id_change    *za, *tmp;
+       struct hlist_node       *hnode;
+       cfs_hash_bd_t            bd;
+
+       za = osd_zfs_lookup_by_id(hash, id);
+       if (likely(za != NULL))
+               return za;
+
+       OBD_ALLOC_PTR(za);
+       if (unlikely(za == NULL))
+               return NULL;
+
+       za->zic_id = id;
+
+       cfs_hash_bd_get(hash, &id, &bd);
+       spin_lock(&osd->od_known_txg_lock);
+       hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1);
+       LASSERT(hnode != NULL);
+       tmp = container_of0(hnode, struct zfs_id_change, zic_hash);
+       spin_unlock(&osd->od_known_txg_lock);
+
+       if (tmp == za) {
+               /*
+                * our structure got into the hash
+                */
+       } else {
+               /* somebody won the race, we wasted the cycles */
+               OBD_FREE_PTR(za);
+       }
+
+       return tmp;
+}
+
+/*
+ * used to maintain per-txg deltas
+ */
+static int osd_zfs_acct_id(const struct lu_env *env, cfs_hash_t *hash,
+                          __u64 id, int delta, struct osd_thandle *oh)
+{
+       struct osd_device       *osd = osd_dt_dev(oh->ot_super.th_dev);
+       struct zfs_id_change    *za;
+
+       LASSERT(hash);
+       LASSERT(oh->ot_tx);
+       LASSERT(oh->ot_tx->tx_txg == osd->od_known_txg);
+       LASSERT(osd->od_acct_delta != NULL);
+
+       za = lookup_or_create_by_id(osd, hash, id);
+       if (unlikely(za == NULL))
+               return -ENOMEM;
+
+       atomic_add(delta, &za->zic_num);
+
+       return 0;
+}
+
+/*
+ * this function is used to maintain current state for given ID:
+ * at the beginning it initializes the cache from correspoding ZAP
+ */
+static void osd_zfs_acct_cache_init(const struct lu_env *env,
+                                   struct osd_device *osd,
+                                   cfs_hash_t *hash, __u64 oid,
+                                   __u64 id, int delta,
+                                   struct osd_thandle *oh)
+{
+       char                    *buf  = osd_oti_get(env)->oti_buf;
+       struct hlist_node       *hnode;
+       cfs_hash_bd_t            bd;
+       struct zfs_id_change    *za, *tmp;
+       __u64                    v;
+       int                      rc;
+
+       za = osd_zfs_lookup_by_id(hash, id);
+       if (likely(za != NULL))
+               goto apply;
+
+       /*
+        * any concurrent thread is running in the same txg, so no on-disk
+        * accounting ZAP can be modified until this txg is closed
+        * thus all the concurrent threads must be getting the same value
+        * from that ZAP and we don't need to serialize lookups
+        */
+       snprintf(buf, sizeof(osd_oti_get(env)->oti_buf), "%llx", id);
+       /* XXX: we should be using zap_lookup_int_key(), but it consumes
+        *      20 bytes on the stack for buf .. */
+       rc = -zap_lookup(osd->od_objset.os, oid, buf, sizeof(uint64_t), 1, &v);
+       if (rc == -ENOENT) {
+               v = 0;
+       } else if (unlikely(rc != 0)) {
+               CERROR("%s: can't access accounting zap %llu\n",
+                      osd->od_svname, oid);
+               return;
+       }
+
+       OBD_ALLOC_PTR(za);
+       if (unlikely(za == NULL)) {
+               CERROR("%s: can't allocate za\n", osd->od_svname);
+               return;
+       }
+
+       za->zic_id = id;
+       atomic_set(&za->zic_num, v);
+
+       cfs_hash_bd_get(hash, &id, &bd);
+       spin_lock(&osd->od_known_txg_lock);
+       hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1);
+       LASSERT(hnode != NULL);
+       tmp = container_of0(hnode, struct zfs_id_change, zic_hash);
+       spin_unlock(&osd->od_known_txg_lock);
+
+       if (tmp == za) {
+               /* our structure got into the hash */
+               if (rc == -ENOENT) {
+                       /* there was no entry in ZAP yet, we have
+                        * to initialize with 0, so that accounting
+                        * reports can find that and then find our
+                        * cached value. */
+                       v = 0;
+                       rc = -zap_update(osd->od_objset.os, oid, buf,
+                                        sizeof(uint64_t), 1, &v, oh->ot_tx);
+                       if (unlikely(rc != 0))
+                               CERROR("%s: can't initialize: rc = %d\n",
+                                      osd->od_svname, rc);
+               }
+       } else {
+               /* somebody won the race, we wasted the cycles */
+               OBD_FREE_PTR(za);
+               za = tmp;
+       }
+
+apply:
+       LASSERT(za != NULL);
+       atomic_add(delta, &za->zic_num);
+}
+
+static __u32 acct_hashfn(cfs_hash_t *hash_body, const void *key, unsigned mask)
+{
+       const __u64     *id = key;
+       __u32            result;
+
+       result = (__u32) *id;
+       return result % mask;
+}
+
+static void *acct_key(struct hlist_node *hnode)
+{
+       struct zfs_id_change    *ac;
+
+       ac = hlist_entry(hnode, struct zfs_id_change, zic_hash);
+       return &ac->zic_id;
+}
+
+static int acct_hashkey_keycmp(const void *key,
+                              struct hlist_node *compared_hnode)
+{
+       struct zfs_id_change    *ac;
+       const __u64             *id = key;
+
+       ac = hlist_entry(compared_hnode, struct zfs_id_change, zic_hash);
+       return *id == ac->zic_id;
+}
+
+static void *acct_hashobject(struct hlist_node *hnode)
+{
+       return hlist_entry(hnode, struct zfs_id_change, zic_hash);
+}
+
+static cfs_hash_ops_t acct_hash_operations = {
+       .hs_hash        = acct_hashfn,
+       .hs_key         = acct_key,
+       .hs_keycmp      = acct_hashkey_keycmp,
+       .hs_object      = acct_hashobject,
+};
+
+#define ACCT_HASH_OPS (CFS_HASH_NO_LOCK|CFS_HASH_NO_ITEMREF|CFS_HASH_ADD_TAIL)
+
+int osd_zfs_acct_init(const struct lu_env *env, struct osd_device *o)
+{
+       int rc = 0;
+       ENTRY;
+
+       spin_lock_init(&o->od_known_txg_lock);
+
+       /* global structure representing current state for given ID */
+       o->od_acct_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0,
+                                        &acct_hash_operations,
+                                        ACCT_HASH_OPS);
+       if (o->od_acct_usr == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+       o->od_acct_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0,
+                                        &acct_hash_operations,
+                                        ACCT_HASH_OPS);
+       if (o->od_acct_grp == NULL)
+               GOTO(out, rc = -ENOMEM);
+
+out:
+       RETURN(rc);
+}
+
+static int osd_zfs_delete_item(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                              struct hlist_node *node, void *data)
+{
+       struct hash_cbdata      *d = data;
+       struct zfs_id_change    *za;
+       __u64                    v;
+       char                     buf[12];
+       int                      rc;
+
+       za = hlist_entry(node, struct zfs_id_change, zic_hash);
+
+       /*
+        * XXX: should we try to fix accounting we failed to update before?
+        */
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 5, 70, 0)
+       /*
+        * extra checks to ensure our cache matches on-disk state
+        */
+       snprintf(buf, sizeof(buf), "%llx", za->zic_id);
+       rc = -zap_lookup(d->hcb_osd->od_objset.os, d->hcb_zapid,
+                        buf, sizeof(uint64_t), 1, &v);
+       /* pairs with zero value are removed by ZAP automatically */
+       if (rc == -ENOENT)
+               v = 0;
+       if (atomic_read(&za->zic_num) != v) {
+               CERROR("%s: INVALID ACCOUNTING FOR %llu %d != %lld: rc = %d\n",
+                      d->hcb_osd->od_svname, za->zic_id,
+                      atomic_read(&za->zic_num), v, rc);
+               /* XXX: to catch with automated testing */
+               LBUG();
+       }
+#else
+#warning "remove this additional check before release"
+#endif
+
+       cfs_hash_bd_del_locked(hs, bd, node);
+       OBD_FREE_PTR(za);
+
+       return 0;
+}
+
+void osd_zfs_acct_fini(const struct lu_env *env, struct osd_device *o)
+{
+       struct hash_cbdata      cbdata;
+
+       cbdata.hcb_osd = o;
+
+       /* release object accounting cache (owners) */
+       cbdata.hcb_zapid = o->od_iusr_oid;
+
+       if (o->od_acct_usr) {
+               cfs_hash_for_each_safe(o->od_acct_usr, osd_zfs_delete_item,
+                                      &cbdata);
+               cfs_hash_putref(o->od_acct_usr);
+               o->od_acct_usr = NULL;
+       }
+
+       /* release object accounting cache (groups) */
+       cbdata.hcb_zapid = o->od_igrp_oid;
+
+       if (o->od_acct_grp) {
+               cfs_hash_for_each_safe(o->od_acct_grp, osd_zfs_delete_item,
+                                      &cbdata);
+               cfs_hash_putref(o->od_acct_grp);
+               o->od_acct_grp = NULL;
+       }
+}
+
+static int osd_zfs_commit_item(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+                              struct hlist_node *node, void *data)
+{
+       struct hash_cbdata      *d = data;
+       struct osd_device       *osd = d->hcb_osd;
+       struct zfs_id_change    *za;
+       int                      rc;
+
+       za = hlist_entry(node, struct zfs_id_change, zic_hash);
+
+       rc = -zap_increment_int(osd->od_objset.os, d->hcb_zapid, za->zic_id,
+                               atomic_read(&za->zic_num), d->hcb_tx);
+       if (unlikely(rc != 0))
+               CERROR("%s: quota update for UID "LPU64" failed: rc = %d\n",
+                      osd->od_svname, za->zic_id, rc);
+
+       cfs_hash_bd_del_locked(hs, bd, node);
+       OBD_FREE_PTR(za);
+
+       return 0;
+}
+
+/*
+ * this function is called as part of txg commit procedure,
+ * no more normal changes are allowed to this txg.
+ * we go over all the changes cached in per-txg structure
+ * and apply them to actual ZAPs
+ */
+static void osd_zfs_acct_update(void *arg, void *arg2, dmu_tx_t *tx)
+{
+       struct osd_zfs_acct_txg *zat = arg;
+       struct osd_device       *osd = zat->zat_osd;
+       struct hash_cbdata       cbdata;
+
+       cbdata.hcb_osd = osd;
+       cbdata.hcb_tx = tx;
+
+       CDEBUG(D_OTHER, "COMMIT %llu on %s\n", tx->tx_txg, osd->od_svname);
+
+       /* apply changes related to the owners */
+       cbdata.hcb_zapid = osd->od_iusr_oid;
+       cfs_hash_for_each_safe(zat->zat_usr, osd_zfs_commit_item, &cbdata);
+
+       /* apply changes related to the groups */
+       cbdata.hcb_zapid = osd->od_igrp_oid;
+       cfs_hash_for_each_safe(zat->zat_grp, osd_zfs_commit_item, &cbdata);
+
+       cfs_hash_putref(zat->zat_usr);
+       cfs_hash_putref(zat->zat_grp);
+
+       OBD_FREE_PTR(zat);
+}
+
+static int osd_zfs_acct_check(void *arg1, void *arg2, dmu_tx_t *tx)
+{
+       /* check function isn't used currently */
+       return 0;
+}
+
+/*
+ * if any change to the object accounting is going to happen,
+ * we create one structure per txg to track all the changes
+ * and register special routine to be called as part of txg
+ * commit procedure.
+ */
+int osd_zfs_acct_trans_start(const struct lu_env *env, struct osd_thandle *oh)
+{
+       struct osd_device       *osd = osd_dt_dev(oh->ot_super.th_dev);
+       struct osd_zfs_acct_txg *ac = NULL;
+       int                      rc = 0, add_work = 0;
+
+       if (likely(oh->ot_tx->tx_txg == osd->od_known_txg)) {
+               /* already created */
+               return 0;
+       }
+
+       OBD_ALLOC_PTR(ac);
+       if (unlikely(ac == NULL))
+               return -ENOMEM;
+
+       ac->zat_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0,
+                                     &acct_hash_operations,
+                                     ACCT_HASH_OPS);
+       if (unlikely(ac->zat_usr == NULL)) {
+               CERROR("%s: can't allocate hash for accounting\n",
+                       osd->od_svname);
+               GOTO(out, rc = -ENOMEM);
+       }
+
+       ac->zat_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0,
+                                     &acct_hash_operations,
+                                     ACCT_HASH_OPS);
+       if (unlikely(ac->zat_grp == NULL)) {
+               CERROR("%s: can't allocate hash for accounting\n",
+                       osd->od_svname);
+               GOTO(out, rc = -ENOMEM);
+       }
+
+       spin_lock(&osd->od_known_txg_lock);
+       if (oh->ot_tx->tx_txg != osd->od_known_txg) {
+               osd->od_acct_delta = ac;
+               osd->od_known_txg = oh->ot_tx->tx_txg;
+               add_work = 1;
+       }
+       spin_unlock(&osd->od_known_txg_lock);
+
+       /* schedule a callback to be run in the context of txg
+        * once the latter is closed and syncing */
+       if (add_work) {
+               spa_t *spa = dmu_objset_spa(osd->od_objset.os);
+               LASSERT(ac->zat_osd == NULL);
+               ac->zat_osd = osd;
+               dsl_sync_task_do_nowait(spa_get_dsl(spa),
+                                       osd_zfs_acct_check,
+                                       osd_zfs_acct_update,
+                                       ac, NULL, 128, oh->ot_tx);
+
+               /* no to be freed now */
+               ac = NULL;
+       }
+
+out:
+       if (ac != NULL) {
+               /* another thread has installed new structure already */
+               if (ac->zat_usr)
+                       cfs_hash_putref(ac->zat_usr);
+               if (ac->zat_grp)
+                       cfs_hash_putref(ac->zat_grp);
+               OBD_FREE_PTR(ac);
+       }
+
+       return rc;
+}
+
+void osd_zfs_acct_uid(const struct lu_env *env, struct osd_device *osd,
+                     __u64 uid, int delta, struct osd_thandle *oh)
+{
+       int rc;
+
+       /* add per-txg job to update accounting */
+       rc = osd_zfs_acct_trans_start(env, oh);
+       if (unlikely(rc != 0))
+               return;
+
+       /* maintain per-OSD cached value */
+       osd_zfs_acct_cache_init(env, osd, osd->od_acct_usr,
+                               osd->od_iusr_oid, uid, delta, oh);
+
+       /* maintain per-TXG delta */
+       osd_zfs_acct_id(env, osd->od_acct_delta->zat_usr, uid, delta, oh);
+
+}
+
+void osd_zfs_acct_gid(const struct lu_env *env, struct osd_device *osd,
+                     __u64 gid, int delta, struct osd_thandle *oh)
+{
+       int rc;
+
+       /* add per-txg job to update accounting */
+       rc = osd_zfs_acct_trans_start(env, oh);
+       if (unlikely(rc != 0))
+               return;
+
+       /* maintain per-OSD cached value */
+       osd_zfs_acct_cache_init(env, osd, osd->od_acct_grp,
+                               osd->od_igrp_oid, gid, delta, oh);
+
+       /* maintain per-TXG delta */
+       osd_zfs_acct_id(env, osd->od_acct_delta->zat_grp, gid, delta, oh);
+}
+
 /**
  * Space Accounting Management
  */
@@ -63,18 +556,19 @@ uint64_t osd_quota_fid2dmu(const struct lu_fid *fid)
  * \retval -ve - failure
  */
 static int osd_acct_index_lookup(const struct lu_env *env,
-                               struct dt_object *dtobj,
-                               struct dt_rec *dtrec,
-                               const struct dt_key *dtkey,
-                               struct lustre_capa *capa)
+                                struct dt_object *dtobj,
+                                struct dt_rec *dtrec,
+                                const struct dt_key *dtkey,
+                                struct lustre_capa *capa)
 {
        struct osd_thread_info  *info = osd_oti_get(env);
        char                    *buf  = info->oti_buf;
        struct lquota_acct_rec  *rec  = (struct lquota_acct_rec *)dtrec;
        struct osd_object       *obj = osd_dt_obj(dtobj);
        struct osd_device       *osd = osd_obj2dev(obj);
-       int                      rc;
        uint64_t                 oid;
+       struct zfs_id_change    *za = NULL;
+       int                      rc;
        ENTRY;
 
        rec->bspace = rec->ispace = 0;
@@ -109,8 +603,20 @@ static int osd_acct_index_lookup(const struct lu_env *env,
 
        /* as for inode accounting, it is not maintained by DMU, so we just
         * use our own ZAP to track inode usage */
-       rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
-                        buf, sizeof(uint64_t), 1, &rec->ispace);
+       if (oid == DMU_USERUSED_OBJECT) {
+               za = osd_zfs_lookup_by_id(osd->od_acct_usr,
+                                        *((__u64 *)dtkey));
+       } else if (oid == DMU_GROUPUSED_OBJECT) {
+               za = osd_zfs_lookup_by_id(osd->od_acct_grp,
+                                        *((__u64 *)dtkey));
+       }
+       if (za) {
+               rec->ispace = atomic_read(&za->zic_num);
+       } else {
+               rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object,
+                               buf, sizeof(uint64_t), 1, &rec->ispace);
+       }
+
        if (rc == -ENOENT)
                /* user/group has not created any file yet */
                CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
@@ -149,6 +655,13 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env,
        memset(it, 0, sizeof(*it));
        it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo));
 
+       if (it->oiq_oid == DMU_GROUPUSED_OBJECT)
+               it->oiq_hash = osd->od_acct_grp;
+       else if (it->oiq_oid == DMU_USERUSED_OBJECT)
+               it->oiq_hash = osd->od_acct_usr;
+       else
+               LBUG();
+
        /* initialize zap cursor */
        rc = -udmu_zap_cursor_init(&it->oiq_zc, &osd->od_objset, it->oiq_oid,0);
        if (rc)
@@ -252,6 +765,7 @@ static int osd_it_acct_rec(const struct lu_env *env,
        struct osd_object       *obj = it->oiq_obj;
        struct osd_device       *osd = osd_obj2dev(obj);
        int                      bytes_read;
+       struct zfs_id_change    *za;
        int                      rc;
        ENTRY;
 
@@ -280,16 +794,23 @@ static int osd_it_acct_rec(const struct lu_env *env,
 
        /* inode accounting is not maintained by DMU, so we use our own ZAP to
         * track inode usage */
-       rc = -zap_lookup(osd->od_objset.os, it->oiq_obj->oo_db->db_object,
-                        buf, sizeof(uint64_t), 1, &rec->ispace);
-       if (rc == -ENOENT)
-               /* user/group has not created any file yet */
-               CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n",
-                      osd->od_svname, buf);
-       else if (rc)
-               RETURN(rc);
+       za = osd_zfs_lookup_by_id(it->oiq_hash, it->oiq_id);
+       if (za != NULL) {
+               /* found in the cache */
+               rec->ispace = atomic_read(&za->zic_num);
+       } else {
+                rc = -zap_lookup(osd->od_objset.os,
+                                 it->oiq_obj->oo_db->db_object,
+                                 buf, sizeof(uint64_t), 1, &rec->ispace);
+                if (rc == -ENOENT) {
+                       /* user/group has not created any file yet */
+                       CDEBUG(D_QUOTA, "%s: id %s not found in ZAP\n",
+                              osd->od_svname, buf);
+                       rc = 0;
+               }
+       }
 
-       RETURN(0);
+       RETURN(rc);
 }
 
 /**