From: Alex Zhuravlev Date: Sun, 28 Jul 2013 18:12:29 +0000 (+0400) Subject: LU-2600 osd-zfs: batched object accounting X-Git-Tag: 2.5.59~106 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=72accbebccb6a542b327e45e9a0903b2951d4bd5;p=fs%2Flustre-release.git LU-2600 osd-zfs: batched object accounting using dsl_sync_task_do_nowait() we can schedule the updates to be executed in a closing txg. this way object accounting can be done in memory and then updated in ZAPs once per txg. Test-Parameters: fortestonly envdefinitions=SLOW=yes,ENABLE_QUOTA=yes \ mdtfilesystemtype=zfs mdsfilesystemtype=zfs ostfilesystemtype=zfs testlist=sanity-quota Signed-off-by: Alex Zhuravlev Change-Id: Iebe96c9847c6c53b8232295a44fd8044b1ae18b4 Reviewed-on: http://review.whamcloud.com/7157 Tested-by: Jenkins Reviewed-by: Johann Lombardi Reviewed-by: Niu Yawei Reviewed-by: Mike Pershin Reviewed-by: Isaac Huang --- diff --git a/libcfs/libcfs/hash.c b/libcfs/libcfs/hash.c index 0c4faf8..bfed025 100644 --- a/libcfs/libcfs/hash.c +++ b/libcfs/libcfs/hash.c @@ -1035,8 +1035,10 @@ cfs_hash_create(char *name, unsigned cur_bits, unsigned max_bits, LASSERT(ops->hs_hash); LASSERT(ops->hs_object); LASSERT(ops->hs_keycmp); - LASSERT(ops->hs_get != NULL); - LASSERT(ops->hs_put_locked != NULL); + if ((flags & CFS_HASH_NO_ITEMREF) == 0) { + LASSERT(ops->hs_get != NULL); + LASSERT(ops->hs_put_locked != NULL); + } if ((flags & CFS_HASH_REHASH) != 0) flags |= CFS_HASH_COUNTER; /* must have counter */ diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 47bbc76..0dc8c2e 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -532,6 +532,10 @@ static int osd_mount(const struct lu_env *env, strncpy(o->od_svname, lustre_cfg_string(cfg, 4), sizeof(o->od_svname) - 1); + rc = osd_zfs_acct_init(env, o); + if (rc) + RETURN(rc); + if (server_name_is_ost(o->od_svname)) o->od_is_ost = 1; @@ -617,6 +621,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o) CERROR("%s: lost %d pinned dbuf(s)\n", o->od_svname, atomic_read(&o->od_zerocopy_pin)); + osd_zfs_acct_fini(env, o); + if (o->od_objset.os != NULL) udmu_objset_close(&o->od_objset); diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index bc51cb1..63f680c 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -82,6 +82,8 @@ struct osd_it_quota { zap_cursor_t *oiq_zc; /** identifier for current quota record */ __u64 oiq_id; + /* the hash where object accounting is cached */ + cfs_hash_t *oiq_hash; unsigned oiq_reset:1; /* 1 -- no need to advance */ }; @@ -227,6 +229,16 @@ struct osd_seq_list { #define OSD_OST_MAP_SIZE 32 /* + * this structure tracks changes made to the accounting within specific TXG + */ +struct osd_zfs_acct_txg { + uint64_t zat_txg; + cfs_hash_t *zat_usr; + cfs_hash_t *zat_grp; + struct osd_device *zat_osd; +}; + +/* * osd device. */ struct osd_device { @@ -279,6 +291,13 @@ struct osd_device { atomic_t od_zerocopy_pin; arc_prune_t *arc_prune_cb; + + /* quota: object accounting */ + spinlock_t od_known_txg_lock; + uint64_t od_known_txg; + struct osd_zfs_acct_txg *od_acct_delta; + cfs_hash_t *od_acct_usr; + cfs_hash_t *od_acct_grp; }; struct osd_object { @@ -313,7 +332,6 @@ struct osd_object { int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *); extern const struct dt_index_operations osd_acct_index_ops; -uint64_t osd_quota_fid2dmu(const struct lu_fid *fid); extern struct lu_device_operations osd_lu_ops; int osd_declare_quota(const struct lu_env *env, struct osd_device *osd, qid_t uid, qid_t gid, long long space, @@ -496,6 +514,13 @@ osd_xattr_set_internal(const struct lu_env *env, struct osd_object *obj, return rc; } +void osd_zfs_acct_uid(const struct lu_env *env, struct osd_device *osd, + __u64 uid, int delta, struct osd_thandle *oh); +void osd_zfs_acct_gid(const struct lu_env *env, struct osd_device *osd, + __u64 gid, int delta, struct osd_thandle *oh); +int osd_zfs_acct_init(const struct lu_env *env, struct osd_device *osd); +void osd_zfs_acct_fini(const struct lu_env *env, struct osd_device *osd); + static inline uint64_t attrs_fs2zfs(const uint32_t flags) { return (((flags & FS_APPEND_FL) ? ZFS_APPENDONLY : 0) | diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 89a91ea..0a4646b 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -632,21 +632,9 @@ static int osd_object_destroy(const struct lu_env *env, GOTO(out, rc); } - /* Remove object from inode accounting. It is not fatal for the destroy - * operation if something goes wrong while updating accounting, but we - * still log an error message to notify the administrator */ - rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid, - obj->oo_attr.la_uid, -1, oh->ot_tx); - if (rc) - CERROR("%s: failed to remove "DFID" from accounting ZAP for usr" - " %d: rc = %d\n", osd->od_svname, PFID(fid), - obj->oo_attr.la_uid, rc); - rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid, - obj->oo_attr.la_gid, -1, oh->ot_tx); - if (rc) - CERROR("%s: failed to remove "DFID" from accounting ZAP for grp" - " %d: rc = %d\n", osd->od_svname, PFID(fid), - obj->oo_attr.la_gid, rc); + /* do object accounting */ + osd_zfs_acct_uid(env, osd, obj->oo_attr.la_uid, -1, oh); + osd_zfs_acct_gid(env, osd, obj->oo_attr.la_gid, -1, oh); /* kill object */ rc = __osd_object_destroy(env, obj, oh->ot_tx, osd_obj_tag); @@ -956,34 +944,14 @@ static int osd_attr_set(const struct lu_env *env, struct dt_object *dt, /* do both accounting updates outside oo_attr_lock below */ if ((la->la_valid & LA_UID) && (la->la_uid != obj->oo_attr.la_uid)) { - /* Update user accounting. Failure isn't fatal, but we still - * log an error message */ - rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid, - la->la_uid, 1, oh->ot_tx); - if (rc) - CERROR("%s: failed to update accounting ZAP for user " - "%d (%d)\n", osd->od_svname, la->la_uid, rc); - rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid, - obj->oo_attr.la_uid, -1, oh->ot_tx); - if (rc) - CERROR("%s: failed to update accounting ZAP for user " - "%d (%d)\n", osd->od_svname, - obj->oo_attr.la_uid, rc); + /* do object accounting */ + osd_zfs_acct_uid(env, osd, la->la_uid, 1, oh); + osd_zfs_acct_uid(env, osd, obj->oo_attr.la_uid, -1, oh); } if ((la->la_valid & LA_GID) && (la->la_gid != obj->oo_attr.la_gid)) { - /* Update group accounting. Failure isn't fatal, but we still - * log an error message */ - rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid, - la->la_gid, 1, oh->ot_tx); - if (rc) - CERROR("%s: failed to update accounting ZAP for user " - "%d (%d)\n", osd->od_svname, la->la_gid, rc); - rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid, - obj->oo_attr.la_gid, -1, oh->ot_tx); - if (rc) - CERROR("%s: failed to update accounting ZAP for user " - "%d (%d)\n", osd->od_svname, - obj->oo_attr.la_gid, rc); + /* do object accounting */ + osd_zfs_acct_gid(env, osd, la->la_gid, 1, oh); + osd_zfs_acct_gid(env, osd, obj->oo_attr.la_gid, -1, oh); } write_lock(&obj->oo_attr_lock); @@ -1505,18 +1473,9 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, /* Add new object to inode accounting. * Errors are not considered as fatal */ - rc = -zap_increment_int(osd->od_objset.os, osd->od_iusr_oid, - (attr->la_valid & LA_UID) ? attr->la_uid : 0, 1, - oh->ot_tx); - if (rc) - CERROR("%s: failed to add "DFID" to accounting ZAP for usr %d " - "(%d)\n", osd->od_svname, PFID(fid), attr->la_uid, rc); - rc = -zap_increment_int(osd->od_objset.os, osd->od_igrp_oid, - (attr->la_valid & LA_GID) ? attr->la_gid : 0, 1, - oh->ot_tx); - if (rc) - CERROR("%s: failed to add "DFID" to accounting ZAP for grp %d " - "(%d)\n", osd->od_svname, PFID(fid), attr->la_gid, rc); + /* XXX: UID/GID must be defined otherwise we can break accounting */ + osd_zfs_acct_uid(env, osd, attr->la_uid, 1, oh); + osd_zfs_acct_gid(env, osd, attr->la_gid, 1, oh); /* configure new osd object */ obj->oo_db = db; diff --git a/lustre/osd-zfs/osd_quota.c b/lustre/osd-zfs/osd_quota.c index b51be3b..a27505b 100644 --- a/lustre/osd-zfs/osd_quota.c +++ b/lustre/osd-zfs/osd_quota.c @@ -31,10 +31,36 @@ #include #include "osd_internal.h" +#include +#include +#include +#include +#include +#include + +/* + * the structure tracks per-ID change/state + */ +struct zfs_id_change { + struct hlist_node zic_hash; + __u64 zic_id; + atomic_t zic_num; +}; + +/* + * callback data for cfs_hash_for_each_safe() + * used in txg commit and OSD cleanup path + */ +struct hash_cbdata { + struct osd_device *hcb_osd; + uint64_t hcb_zapid; + dmu_tx_t *hcb_tx; +}; + /** * Helper function to retrieve DMU object id from fid for accounting object */ -uint64_t osd_quota_fid2dmu(const struct lu_fid *fid) +static inline uint64_t osd_quota_fid2dmu(const struct lu_fid *fid) { LASSERT(fid_is_acct(fid)); if (fid_oid(fid) == ACCT_GROUP_OID) @@ -42,6 +68,473 @@ uint64_t osd_quota_fid2dmu(const struct lu_fid *fid) return DMU_USERUSED_OBJECT; } +/* + * a note about locking: + * entries in per-OSD cache never go before umount, + * so there is no need in locking for lookups. + * + * entries in per-txg deltas never go before txg is closed, + * there is no concurrency between removal/insertions. + * + * also, given all above, there is no need in reference counting. + */ +static struct zfs_id_change *osd_zfs_lookup_by_id(cfs_hash_t *hash, __u64 id) +{ + struct zfs_id_change *za = NULL; + struct hlist_node *hnode; + cfs_hash_bd_t bd; + + cfs_hash_bd_get(hash, &id, &bd); + hnode = cfs_hash_bd_peek_locked(hash, &bd, &id); + if (hnode != NULL) + za = container_of0(hnode, struct zfs_id_change, zic_hash); + + return za; +} + +static struct zfs_id_change *lookup_or_create_by_id(struct osd_device *osd, + cfs_hash_t *hash, __u64 id) +{ + struct zfs_id_change *za, *tmp; + struct hlist_node *hnode; + cfs_hash_bd_t bd; + + za = osd_zfs_lookup_by_id(hash, id); + if (likely(za != NULL)) + return za; + + OBD_ALLOC_PTR(za); + if (unlikely(za == NULL)) + return NULL; + + za->zic_id = id; + + cfs_hash_bd_get(hash, &id, &bd); + spin_lock(&osd->od_known_txg_lock); + hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1); + LASSERT(hnode != NULL); + tmp = container_of0(hnode, struct zfs_id_change, zic_hash); + spin_unlock(&osd->od_known_txg_lock); + + if (tmp == za) { + /* + * our structure got into the hash + */ + } else { + /* somebody won the race, we wasted the cycles */ + OBD_FREE_PTR(za); + } + + return tmp; +} + +/* + * used to maintain per-txg deltas + */ +static int osd_zfs_acct_id(const struct lu_env *env, cfs_hash_t *hash, + __u64 id, int delta, struct osd_thandle *oh) +{ + struct osd_device *osd = osd_dt_dev(oh->ot_super.th_dev); + struct zfs_id_change *za; + + LASSERT(hash); + LASSERT(oh->ot_tx); + LASSERT(oh->ot_tx->tx_txg == osd->od_known_txg); + LASSERT(osd->od_acct_delta != NULL); + + za = lookup_or_create_by_id(osd, hash, id); + if (unlikely(za == NULL)) + return -ENOMEM; + + atomic_add(delta, &za->zic_num); + + return 0; +} + +/* + * this function is used to maintain current state for given ID: + * at the beginning it initializes the cache from correspoding ZAP + */ +static void osd_zfs_acct_cache_init(const struct lu_env *env, + struct osd_device *osd, + cfs_hash_t *hash, __u64 oid, + __u64 id, int delta, + struct osd_thandle *oh) +{ + char *buf = osd_oti_get(env)->oti_buf; + struct hlist_node *hnode; + cfs_hash_bd_t bd; + struct zfs_id_change *za, *tmp; + __u64 v; + int rc; + + za = osd_zfs_lookup_by_id(hash, id); + if (likely(za != NULL)) + goto apply; + + /* + * any concurrent thread is running in the same txg, so no on-disk + * accounting ZAP can be modified until this txg is closed + * thus all the concurrent threads must be getting the same value + * from that ZAP and we don't need to serialize lookups + */ + snprintf(buf, sizeof(osd_oti_get(env)->oti_buf), "%llx", id); + /* XXX: we should be using zap_lookup_int_key(), but it consumes + * 20 bytes on the stack for buf .. */ + rc = -zap_lookup(osd->od_objset.os, oid, buf, sizeof(uint64_t), 1, &v); + if (rc == -ENOENT) { + v = 0; + } else if (unlikely(rc != 0)) { + CERROR("%s: can't access accounting zap %llu\n", + osd->od_svname, oid); + return; + } + + OBD_ALLOC_PTR(za); + if (unlikely(za == NULL)) { + CERROR("%s: can't allocate za\n", osd->od_svname); + return; + } + + za->zic_id = id; + atomic_set(&za->zic_num, v); + + cfs_hash_bd_get(hash, &id, &bd); + spin_lock(&osd->od_known_txg_lock); + hnode = cfs_hash_bd_findadd_locked(hash, &bd, &id, &za->zic_hash, 1); + LASSERT(hnode != NULL); + tmp = container_of0(hnode, struct zfs_id_change, zic_hash); + spin_unlock(&osd->od_known_txg_lock); + + if (tmp == za) { + /* our structure got into the hash */ + if (rc == -ENOENT) { + /* there was no entry in ZAP yet, we have + * to initialize with 0, so that accounting + * reports can find that and then find our + * cached value. */ + v = 0; + rc = -zap_update(osd->od_objset.os, oid, buf, + sizeof(uint64_t), 1, &v, oh->ot_tx); + if (unlikely(rc != 0)) + CERROR("%s: can't initialize: rc = %d\n", + osd->od_svname, rc); + } + } else { + /* somebody won the race, we wasted the cycles */ + OBD_FREE_PTR(za); + za = tmp; + } + +apply: + LASSERT(za != NULL); + atomic_add(delta, &za->zic_num); +} + +static __u32 acct_hashfn(cfs_hash_t *hash_body, const void *key, unsigned mask) +{ + const __u64 *id = key; + __u32 result; + + result = (__u32) *id; + return result % mask; +} + +static void *acct_key(struct hlist_node *hnode) +{ + struct zfs_id_change *ac; + + ac = hlist_entry(hnode, struct zfs_id_change, zic_hash); + return &ac->zic_id; +} + +static int acct_hashkey_keycmp(const void *key, + struct hlist_node *compared_hnode) +{ + struct zfs_id_change *ac; + const __u64 *id = key; + + ac = hlist_entry(compared_hnode, struct zfs_id_change, zic_hash); + return *id == ac->zic_id; +} + +static void *acct_hashobject(struct hlist_node *hnode) +{ + return hlist_entry(hnode, struct zfs_id_change, zic_hash); +} + +static cfs_hash_ops_t acct_hash_operations = { + .hs_hash = acct_hashfn, + .hs_key = acct_key, + .hs_keycmp = acct_hashkey_keycmp, + .hs_object = acct_hashobject, +}; + +#define ACCT_HASH_OPS (CFS_HASH_NO_LOCK|CFS_HASH_NO_ITEMREF|CFS_HASH_ADD_TAIL) + +int osd_zfs_acct_init(const struct lu_env *env, struct osd_device *o) +{ + int rc = 0; + ENTRY; + + spin_lock_init(&o->od_known_txg_lock); + + /* global structure representing current state for given ID */ + o->od_acct_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0, + &acct_hash_operations, + ACCT_HASH_OPS); + if (o->od_acct_usr == NULL) + GOTO(out, rc = -ENOMEM); + + o->od_acct_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0, + &acct_hash_operations, + ACCT_HASH_OPS); + if (o->od_acct_grp == NULL) + GOTO(out, rc = -ENOMEM); + +out: + RETURN(rc); +} + +static int osd_zfs_delete_item(cfs_hash_t *hs, cfs_hash_bd_t *bd, + struct hlist_node *node, void *data) +{ + struct hash_cbdata *d = data; + struct zfs_id_change *za; + __u64 v; + char buf[12]; + int rc; + + za = hlist_entry(node, struct zfs_id_change, zic_hash); + + /* + * XXX: should we try to fix accounting we failed to update before? + */ +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 5, 70, 0) + /* + * extra checks to ensure our cache matches on-disk state + */ + snprintf(buf, sizeof(buf), "%llx", za->zic_id); + rc = -zap_lookup(d->hcb_osd->od_objset.os, d->hcb_zapid, + buf, sizeof(uint64_t), 1, &v); + /* pairs with zero value are removed by ZAP automatically */ + if (rc == -ENOENT) + v = 0; + if (atomic_read(&za->zic_num) != v) { + CERROR("%s: INVALID ACCOUNTING FOR %llu %d != %lld: rc = %d\n", + d->hcb_osd->od_svname, za->zic_id, + atomic_read(&za->zic_num), v, rc); + /* XXX: to catch with automated testing */ + LBUG(); + } +#else +#warning "remove this additional check before release" +#endif + + cfs_hash_bd_del_locked(hs, bd, node); + OBD_FREE_PTR(za); + + return 0; +} + +void osd_zfs_acct_fini(const struct lu_env *env, struct osd_device *o) +{ + struct hash_cbdata cbdata; + + cbdata.hcb_osd = o; + + /* release object accounting cache (owners) */ + cbdata.hcb_zapid = o->od_iusr_oid; + + if (o->od_acct_usr) { + cfs_hash_for_each_safe(o->od_acct_usr, osd_zfs_delete_item, + &cbdata); + cfs_hash_putref(o->od_acct_usr); + o->od_acct_usr = NULL; + } + + /* release object accounting cache (groups) */ + cbdata.hcb_zapid = o->od_igrp_oid; + + if (o->od_acct_grp) { + cfs_hash_for_each_safe(o->od_acct_grp, osd_zfs_delete_item, + &cbdata); + cfs_hash_putref(o->od_acct_grp); + o->od_acct_grp = NULL; + } +} + +static int osd_zfs_commit_item(cfs_hash_t *hs, cfs_hash_bd_t *bd, + struct hlist_node *node, void *data) +{ + struct hash_cbdata *d = data; + struct osd_device *osd = d->hcb_osd; + struct zfs_id_change *za; + int rc; + + za = hlist_entry(node, struct zfs_id_change, zic_hash); + + rc = -zap_increment_int(osd->od_objset.os, d->hcb_zapid, za->zic_id, + atomic_read(&za->zic_num), d->hcb_tx); + if (unlikely(rc != 0)) + CERROR("%s: quota update for UID "LPU64" failed: rc = %d\n", + osd->od_svname, za->zic_id, rc); + + cfs_hash_bd_del_locked(hs, bd, node); + OBD_FREE_PTR(za); + + return 0; +} + +/* + * this function is called as part of txg commit procedure, + * no more normal changes are allowed to this txg. + * we go over all the changes cached in per-txg structure + * and apply them to actual ZAPs + */ +static void osd_zfs_acct_update(void *arg, void *arg2, dmu_tx_t *tx) +{ + struct osd_zfs_acct_txg *zat = arg; + struct osd_device *osd = zat->zat_osd; + struct hash_cbdata cbdata; + + cbdata.hcb_osd = osd; + cbdata.hcb_tx = tx; + + CDEBUG(D_OTHER, "COMMIT %llu on %s\n", tx->tx_txg, osd->od_svname); + + /* apply changes related to the owners */ + cbdata.hcb_zapid = osd->od_iusr_oid; + cfs_hash_for_each_safe(zat->zat_usr, osd_zfs_commit_item, &cbdata); + + /* apply changes related to the groups */ + cbdata.hcb_zapid = osd->od_igrp_oid; + cfs_hash_for_each_safe(zat->zat_grp, osd_zfs_commit_item, &cbdata); + + cfs_hash_putref(zat->zat_usr); + cfs_hash_putref(zat->zat_grp); + + OBD_FREE_PTR(zat); +} + +static int osd_zfs_acct_check(void *arg1, void *arg2, dmu_tx_t *tx) +{ + /* check function isn't used currently */ + return 0; +} + +/* + * if any change to the object accounting is going to happen, + * we create one structure per txg to track all the changes + * and register special routine to be called as part of txg + * commit procedure. + */ +int osd_zfs_acct_trans_start(const struct lu_env *env, struct osd_thandle *oh) +{ + struct osd_device *osd = osd_dt_dev(oh->ot_super.th_dev); + struct osd_zfs_acct_txg *ac = NULL; + int rc = 0, add_work = 0; + + if (likely(oh->ot_tx->tx_txg == osd->od_known_txg)) { + /* already created */ + return 0; + } + + OBD_ALLOC_PTR(ac); + if (unlikely(ac == NULL)) + return -ENOMEM; + + ac->zat_usr = cfs_hash_create("usr", 4, 4, 4, 0, 0, 0, + &acct_hash_operations, + ACCT_HASH_OPS); + if (unlikely(ac->zat_usr == NULL)) { + CERROR("%s: can't allocate hash for accounting\n", + osd->od_svname); + GOTO(out, rc = -ENOMEM); + } + + ac->zat_grp = cfs_hash_create("grp", 4, 4, 4, 0, 0, 0, + &acct_hash_operations, + ACCT_HASH_OPS); + if (unlikely(ac->zat_grp == NULL)) { + CERROR("%s: can't allocate hash for accounting\n", + osd->od_svname); + GOTO(out, rc = -ENOMEM); + } + + spin_lock(&osd->od_known_txg_lock); + if (oh->ot_tx->tx_txg != osd->od_known_txg) { + osd->od_acct_delta = ac; + osd->od_known_txg = oh->ot_tx->tx_txg; + add_work = 1; + } + spin_unlock(&osd->od_known_txg_lock); + + /* schedule a callback to be run in the context of txg + * once the latter is closed and syncing */ + if (add_work) { + spa_t *spa = dmu_objset_spa(osd->od_objset.os); + LASSERT(ac->zat_osd == NULL); + ac->zat_osd = osd; + dsl_sync_task_do_nowait(spa_get_dsl(spa), + osd_zfs_acct_check, + osd_zfs_acct_update, + ac, NULL, 128, oh->ot_tx); + + /* no to be freed now */ + ac = NULL; + } + +out: + if (ac != NULL) { + /* another thread has installed new structure already */ + if (ac->zat_usr) + cfs_hash_putref(ac->zat_usr); + if (ac->zat_grp) + cfs_hash_putref(ac->zat_grp); + OBD_FREE_PTR(ac); + } + + return rc; +} + +void osd_zfs_acct_uid(const struct lu_env *env, struct osd_device *osd, + __u64 uid, int delta, struct osd_thandle *oh) +{ + int rc; + + /* add per-txg job to update accounting */ + rc = osd_zfs_acct_trans_start(env, oh); + if (unlikely(rc != 0)) + return; + + /* maintain per-OSD cached value */ + osd_zfs_acct_cache_init(env, osd, osd->od_acct_usr, + osd->od_iusr_oid, uid, delta, oh); + + /* maintain per-TXG delta */ + osd_zfs_acct_id(env, osd->od_acct_delta->zat_usr, uid, delta, oh); + +} + +void osd_zfs_acct_gid(const struct lu_env *env, struct osd_device *osd, + __u64 gid, int delta, struct osd_thandle *oh) +{ + int rc; + + /* add per-txg job to update accounting */ + rc = osd_zfs_acct_trans_start(env, oh); + if (unlikely(rc != 0)) + return; + + /* maintain per-OSD cached value */ + osd_zfs_acct_cache_init(env, osd, osd->od_acct_grp, + osd->od_igrp_oid, gid, delta, oh); + + /* maintain per-TXG delta */ + osd_zfs_acct_id(env, osd->od_acct_delta->zat_grp, gid, delta, oh); +} + /** * Space Accounting Management */ @@ -63,18 +556,19 @@ uint64_t osd_quota_fid2dmu(const struct lu_fid *fid) * \retval -ve - failure */ static int osd_acct_index_lookup(const struct lu_env *env, - struct dt_object *dtobj, - struct dt_rec *dtrec, - const struct dt_key *dtkey, - struct lustre_capa *capa) + struct dt_object *dtobj, + struct dt_rec *dtrec, + const struct dt_key *dtkey, + struct lustre_capa *capa) { struct osd_thread_info *info = osd_oti_get(env); char *buf = info->oti_buf; struct lquota_acct_rec *rec = (struct lquota_acct_rec *)dtrec; struct osd_object *obj = osd_dt_obj(dtobj); struct osd_device *osd = osd_obj2dev(obj); - int rc; uint64_t oid; + struct zfs_id_change *za = NULL; + int rc; ENTRY; rec->bspace = rec->ispace = 0; @@ -109,8 +603,20 @@ static int osd_acct_index_lookup(const struct lu_env *env, /* as for inode accounting, it is not maintained by DMU, so we just * use our own ZAP to track inode usage */ - rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object, - buf, sizeof(uint64_t), 1, &rec->ispace); + if (oid == DMU_USERUSED_OBJECT) { + za = osd_zfs_lookup_by_id(osd->od_acct_usr, + *((__u64 *)dtkey)); + } else if (oid == DMU_GROUPUSED_OBJECT) { + za = osd_zfs_lookup_by_id(osd->od_acct_grp, + *((__u64 *)dtkey)); + } + if (za) { + rec->ispace = atomic_read(&za->zic_num); + } else { + rc = -zap_lookup(osd->od_objset.os, obj->oo_db->db_object, + buf, sizeof(uint64_t), 1, &rec->ispace); + } + if (rc == -ENOENT) /* user/group has not created any file yet */ CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n", @@ -149,6 +655,13 @@ static struct dt_it *osd_it_acct_init(const struct lu_env *env, memset(it, 0, sizeof(*it)); it->oiq_oid = osd_quota_fid2dmu(lu_object_fid(lo)); + if (it->oiq_oid == DMU_GROUPUSED_OBJECT) + it->oiq_hash = osd->od_acct_grp; + else if (it->oiq_oid == DMU_USERUSED_OBJECT) + it->oiq_hash = osd->od_acct_usr; + else + LBUG(); + /* initialize zap cursor */ rc = -udmu_zap_cursor_init(&it->oiq_zc, &osd->od_objset, it->oiq_oid,0); if (rc) @@ -252,6 +765,7 @@ static int osd_it_acct_rec(const struct lu_env *env, struct osd_object *obj = it->oiq_obj; struct osd_device *osd = osd_obj2dev(obj); int bytes_read; + struct zfs_id_change *za; int rc; ENTRY; @@ -280,16 +794,23 @@ static int osd_it_acct_rec(const struct lu_env *env, /* inode accounting is not maintained by DMU, so we use our own ZAP to * track inode usage */ - rc = -zap_lookup(osd->od_objset.os, it->oiq_obj->oo_db->db_object, - buf, sizeof(uint64_t), 1, &rec->ispace); - if (rc == -ENOENT) - /* user/group has not created any file yet */ - CDEBUG(D_QUOTA, "%s: id %s not found in accounting ZAP\n", - osd->od_svname, buf); - else if (rc) - RETURN(rc); + za = osd_zfs_lookup_by_id(it->oiq_hash, it->oiq_id); + if (za != NULL) { + /* found in the cache */ + rec->ispace = atomic_read(&za->zic_num); + } else { + rc = -zap_lookup(osd->od_objset.os, + it->oiq_obj->oo_db->db_object, + buf, sizeof(uint64_t), 1, &rec->ispace); + if (rc == -ENOENT) { + /* user/group has not created any file yet */ + CDEBUG(D_QUOTA, "%s: id %s not found in ZAP\n", + osd->od_svname, buf); + rc = 0; + } + } - RETURN(0); + RETURN(rc); } /**