From c1baab294d7efb10a2e81cefc12dbc8103c8f175 Mon Sep 17 00:00:00 2001 From: Niu Yawei Date: Tue, 11 Sep 2012 03:35:32 -0400 Subject: [PATCH] LU-1842 quota: zfs local enforcement Quota local enforcement for zfs osd. Signed-off-by: Niu Yawei Change-Id: I91d6698cc5a3f1eb42dd7fc9120f83b70d2a7a6f Reviewed-on: http://review.whamcloud.com/3933 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: Fan Yong Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_handler.c | 23 ++++++++ lustre/osd-zfs/osd_internal.h | 7 +++ lustre/osd-zfs/osd_io.c | 134 ++++++++++++++++++++++++++++++++++++++++-- lustre/osd-zfs/osd_object.c | 101 ++++++++++++++++++++++++++++++- lustre/osd-zfs/osd_quota.c | 64 +++++++++++++++++++- 5 files changed, 321 insertions(+), 8 deletions(-) diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index c34ab99..6bd4bf4 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -131,6 +131,7 @@ static void osd_trans_commit_cb(void *cb_data, int error) { struct osd_thandle *oh = cb_data; struct thandle *th = &oh->ot_super; + struct osd_device *osd = osd_dt_dev(th->th_dev); struct lu_device *lud = &th->th_dev->dd_lu_dev; struct dt_txn_commit_cb *dcb, *tmp; @@ -151,6 +152,14 @@ static void osd_trans_commit_cb(void *cb_data, int error) cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) dcb->dcb_func(NULL, th, dcb, error); + /* Unlike ldiskfs, zfs updates space accounting at commit time. + * As a consequence, op_end is called only now to inform the quota slave + * component that reserved quota space is now accounted in usage and + * should be released. Quota space won't be adjusted at this point since + * we can't provide a suitable environment. It will be performed + * asynchronously by a lquota thread. */ + qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans); + lu_device_put(lud); th->th_dev = NULL; lu_context_exit(&th->th_ctx); @@ -234,10 +243,23 @@ static int osd_trans_stop(const struct lu_env *env, struct thandle *th) LASSERT(oh->ot_tx); dmu_tx_abort(oh->ot_tx); osd_object_sa_dirty_rele(oh); + /* there won't be any commit, release reserved quota space now, + * if any */ + qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans); OBD_FREE_PTR(oh); RETURN(0); } + /* When doing our own inode accounting, the ZAPs storing per-uid/gid + * usage are updated at operation execution time, so we should call + * qsd_op_end() straight away. Otherwise (for blk accounting maintained + * by ZFS and when #inode is estimated from #blks) accounting is updated + * at commit time and the call to qsd_op_end() must be delayed */ + if (oh->ot_quota_trans.lqt_id_cnt > 0 && + !oh->ot_quota_trans.lqt_ids[0].lqi_is_blk && + !osd->od_quota_iused_est) + qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans); + rc = dt_txn_hook_stop(env, th); if (rc != 0) CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n", @@ -279,6 +301,7 @@ static struct thandle *osd_trans_create(const struct lu_env *env, CFS_INIT_LIST_HEAD(&oh->ot_dcb_list); CFS_INIT_LIST_HEAD(&oh->ot_sa_list); cfs_sema_init(&oh->ot_sa_lock, 1); + memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans)); th = &oh->ot_super; th->th_dev = dt; th->th_result = 0; diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 540f2ec..cb9b8a5 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -153,6 +153,8 @@ struct osd_thread_info { zap_attribute_t oti_za; dmu_object_info_t oti_doi; struct luz_direntry oti_zde; + + struct lquota_id_info oti_qi; }; extern struct lu_context_key osd_key; @@ -168,6 +170,7 @@ struct osd_thandle { cfs_list_t ot_sa_list; cfs_semaphore_t ot_sa_lock; dmu_tx_t *ot_tx; + struct lquota_trans ot_quota_trans; __u32 ot_write_commit:1, ot_assigned:1; }; @@ -268,6 +271,10 @@ int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *); extern const struct dt_index_operations osd_acct_index_ops; uint64_t osd_quota_fid2dmu(const struct lu_fid *fid); extern struct lu_device_operations osd_lu_ops; +int osd_declare_quota(const struct lu_env *env, struct osd_device *osd, + qid_t uid, qid_t gid, long long space, + struct osd_thandle *oh, bool is_blk, int *flags, + bool force); /* * Helpers. diff --git a/lustre/osd-zfs/osd_io.c b/lustre/osd-zfs/osd_io.c index 4dafcfb..3cc3c62 100644 --- a/lustre/osd-zfs/osd_io.c +++ b/lustre/osd-zfs/osd_io.c @@ -116,6 +116,7 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t oid; ENTRY; @@ -142,7 +143,12 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt, dmu_tx_hold_write(oh->ot_tx, oid, pos, size); - RETURN(0); + /* dt_declare_write() is usually called for system objects, such + * as llog or last_rcvd files. We needn't enforce quota on those + * objects, so always set the lqi_space as 0. */ + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); } static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt, @@ -452,16 +458,86 @@ static int osd_write_prep(const struct lu_env *env, struct dt_object *dt, return 0; } +/* Return number of blocks that aren't mapped in the [start, start + size] + * region */ +static int osd_count_not_mapped(struct osd_object *obj, uint64_t start, + uint32_t size) +{ + dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)obj->oo_db; + dmu_buf_impl_t *db; + dnode_t *dn; + uint32_t blkshift; + uint64_t end, blkid; + int rc; + ENTRY; + + DB_DNODE_ENTER(dbi); + dn = DB_DNODE(dbi); + + if (dn->dn_maxblkid == 0) { + if (start + size <= dn->dn_datablksz) + GOTO(out, size = 0); + if (start < dn->dn_datablksz) + start = dn->dn_datablksz; + /* assume largest block size */ + blkshift = SPA_MAXBLOCKSHIFT; + } else { + /* blocksize can't change */ + blkshift = dn->dn_datablkshift; + } + + /* compute address of last block */ + end = (start + size - 1) >> blkshift; + /* align start on block boundaries */ + start >>= blkshift; + + /* size is null, can't be mapped */ + if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0) + GOTO(out, size = (end - start + 1) << blkshift); + + /* beyond EOF, can't be mapped */ + if (start > dn->dn_maxblkid) + GOTO(out, size = (end - start + 1) << blkshift); + + size = 0; + for (blkid = start; blkid <= end; blkid++) { + if (blkid == dn->dn_maxblkid) + /* this one is mapped for sure */ + continue; + if (blkid > dn->dn_maxblkid) { + size += (end - blkid + 1) << blkshift; + GOTO(out, size); + } + + rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db); + if (rc) { + /* for ENOENT (block not mapped) and any other errors, + * assume the block isn't mapped */ + size += 1 << blkshift; + continue; + } + dbuf_rele(db, FTAG); + } + + GOTO(out, size); +out: + DB_DNODE_EXIT(dbi); + return size; +} + static int osd_declare_write_commit(const struct lu_env *env, struct dt_object *dt, struct niobuf_local *lnb, int npages, struct thandle *th) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t offset = 0; uint32_t size = 0; - int i; + int i, rc, flags = 0; + bool ignore_quota = false, synced = false; + long long space = 0; ENTRY; LASSERT(dt_object_exists(dt)); @@ -479,6 +555,14 @@ static int osd_declare_write_commit(const struct lu_env *env, * skipped in osd_write_commit(). Hence we skip pages * with lnb_rc != 0 here too */ continue; + /* ignore quota for the whole request if any page is from + * client cache or written by root. + * + * XXX we could handle this on per-lnb basis as done by + * grant. */ + if ((lnb[i].flags & OBD_BRW_NOQUOTA) || + !(lnb[i].flags & OBD_BRW_SYNC)) + ignore_quota = true; if (size == 0) { /* first valid lnb */ offset = lnb[i].lnb_file_offset; @@ -493,18 +577,55 @@ static int osd_declare_write_commit(const struct lu_env *env, dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + /* estimating space that will be consumed by a write is rather + * complicated with ZFS. As a consequence, we don't account for + * indirect blocks and quota overrun will be adjusted once the + * operation is committed, if required. */ + space += osd_count_not_mapped(obj, offset, size); + offset = lnb->lnb_file_offset; size = lnb->len; } - if (size) + if (size) { dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size); + space += osd_count_not_mapped(obj, offset, size); + } dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */ - RETURN(0); + /* backend zfs filesystem might be configured to store multiple data + * copies */ + space *= osd->od_objset.os->os_copies; + space = toqb(space); + CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota " + "space\n", npages, space); + +retry: + /* acquire quota space if needed */ + rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, space, oh, true, &flags, + ignore_quota); + + if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) { + dt_sync(env, th->th_dev); + synced = true; + CDEBUG(D_QUOTA, "retry after sync\n"); + flags = 0; + goto retry; + } + + /* we need only to store the overquota flags in the first lnb for + * now, once we support multiple objects BRW, this code needs be + * revised. */ + if (flags & QUOTA_FL_OVER_USRQUOTA) + lnb[0].flags |= OBD_BRW_OVER_USRQUOTA; + if (flags & QUOTA_FL_OVER_GRPQUOTA) + lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA; + + RETURN(rc); } static int osd_write_commit(const struct lu_env *env, struct dt_object *dt, @@ -697,6 +818,7 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, __u64 start, __u64 end, struct thandle *handle) { struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; __u64 len; ENTRY; @@ -720,7 +842,9 @@ static int osd_declare_punch(const struct lu_env *env, struct dt_object *dt, /* ... and we'll modify size attribute */ dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); - RETURN(0); + RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, + false)); } diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index bd12851..6b1373d 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -458,6 +458,7 @@ static int osd_declare_object_destroy(const struct lu_env *env, struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; uint64_t zapid; + int rc; ENTRY; LASSERT(th != NULL); @@ -480,7 +481,16 @@ static int osd_declare_object_destroy(const struct lu_env *env, dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf); - RETURN(0); + /* one less inode */ + rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, -1, oh, false, NULL, false); + if (rc) + RETURN(rc); + + /* data to be truncated */ + rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid, + obj->oo_attr.la_gid, 0, oh, true, NULL, false); + RETURN(rc); } int __osd_object_free(udmu_objset_t *uos, uint64_t oid, dmu_tx_t *tx) @@ -732,15 +742,78 @@ static int osd_attr_get(const struct lu_env *env, return 0; } +/* Simple wrapper on top of qsd API which implement quota transfer for osd + * setattr needs. As a reminder, only the root user can change ownership of + * a file, that's why EDQUOT & EINPROGRESS errors are discarded */ +static inline int qsd_transfer(const struct lu_env *env, + struct qsd_instance *qsd, + struct lquota_trans *trans, int qtype, + __u64 orig_id, __u64 new_id, __u64 bspace, + struct lquota_id_info *qi) +{ + int rc; + + if (unlikely(qsd == NULL)) + return 0; + + LASSERT(qtype >= 0 && qtype < MAXQUOTAS); + qi->lqi_type = qtype; + + /* inode accounting */ + qi->lqi_is_blk = false; + + /* one more inode for the new owner ... */ + qi->lqi_id.qid_uid = new_id; + qi->lqi_space = 1; + rc = qsd_op_begin(env, qsd, trans, qi, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + return rc; + + /* and one less inode for the current id */ + qi->lqi_id.qid_uid = orig_id;; + qi->lqi_space = -1; + rc = qsd_op_begin(env, qsd, trans, qi, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + return rc; + + /* block accounting */ + qi->lqi_is_blk = true; + + /* more blocks for the new owner ... */ + qi->lqi_id.qid_uid = new_id; + qi->lqi_space = bspace; + rc = qsd_op_begin(env, qsd, trans, qi, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + if (rc) + return rc; + + /* and finally less blocks for the current owner */ + qi->lqi_id.qid_uid = orig_id; + qi->lqi_space = -bspace; + rc = qsd_op_begin(env, qsd, trans, qi, NULL); + if (rc == -EDQUOT || rc == -EINPROGRESS) + rc = 0; + return rc; +} + static int osd_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *handle) { + struct osd_thread_info *info = osd_oti_get(env); char *buf = osd_oti_get(env)->oti_str; struct osd_object *obj = osd_dt_obj(dt); struct osd_device *osd = osd_obj2dev(obj); struct osd_thandle *oh; + uint64_t bspace; + uint32_t blksize; + int rc; ENTRY; if (!dt_object_exists(dt)) { @@ -756,15 +829,38 @@ static int osd_declare_attr_set(const struct lu_env *env, LASSERT(obj->oo_sa_hdl != NULL); dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0); + sa_object_size(obj->oo_sa_hdl, &blksize, &bspace); + bspace = toqb(bspace * blksize); + if (attr && attr->la_valid & LA_UID) { /* account for user inode tracking ZAP update */ dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid); dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf); + + /* quota enforcement for user */ + if (attr->la_uid != obj->oo_attr.la_uid) { + rc = qsd_transfer(env, osd->od_quota_slave, + &oh->ot_quota_trans, USRQUOTA, + obj->oo_attr.la_uid, attr->la_uid, + bspace, &info->oti_qi); + if (rc) + RETURN(rc); + } } if (attr && attr->la_valid & LA_GID) { /* account for user inode tracking ZAP update */ dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid); dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf); + + /* quota enforcement for group */ + if (attr->la_gid != obj->oo_attr.la_gid) { + rc = qsd_transfer(env, osd->od_quota_slave, + &oh->ot_quota_trans, GRPQUOTA, + obj->oo_attr.la_gid, attr->la_gid, + bspace, &info->oti_qi); + if (rc) + RETURN(rc); + } } RETURN(0); @@ -982,7 +1078,8 @@ static int osd_declare_object_create(const struct lu_env *env, dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE); - RETURN(0); + RETURN(osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh, + false, NULL, false)); } int __osd_attr_init(const struct lu_env *env, udmu_objset_t *uos, diff --git a/lustre/osd-zfs/osd_quota.c b/lustre/osd-zfs/osd_quota.c index 1f9e2d2..3c2fa2b 100644 --- a/lustre/osd-zfs/osd_quota.c +++ b/lustre/osd-zfs/osd_quota.c @@ -395,5 +395,67 @@ const struct dt_index_operations osd_acct_index_ops = { /** * Quota Enforcement Management - * TODO */ + +/* + * Wrapper for qsd_op_begin(). + * + * \param env - the environment passed by the caller + * \param osd - is the osd_device + * \param uid - user id of the inode + * \param gid - group id of the inode + * \param space - how many blocks/inodes will be consumed/released + * \param oh - osd transaction handle + * \param is_blk - block quota or inode quota? + * \param flags - if the operation is write, return no user quota, no + * group quota, or sync commit flags to the caller + * \param force - set to 1 when changes are performed by root user and thus + * can't failed with EDQUOT + * + * \retval 0 - success + * \retval -ve - failure + */ +int osd_declare_quota(const struct lu_env *env, struct osd_device *osd, + qid_t uid, qid_t gid, long long space, + struct osd_thandle *oh, bool is_blk, int *flags, + bool force) +{ + struct osd_thread_info *info = osd_oti_get(env); + struct lquota_id_info *qi = &info->oti_qi; + struct qsd_instance *qsd = osd->od_quota_slave; + int rcu, rcg; /* user & group rc */ + ENTRY; + + if (unlikely(qsd == NULL)) + /* quota slave instance hasn't been allocated yet */ + RETURN(0); + + /* let's start with user quota */ + qi->lqi_id.qid_uid = uid; + qi->lqi_type = USRQUOTA; + qi->lqi_space = space; + qi->lqi_is_blk = is_blk; + rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags); + + if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS)) + /* ignore EDQUOT & EINPROGRESS when changes are done by root */ + rcu = 0; + + /* For non-fatal error, we want to continue to get the noquota flags + * for group id. This is only for commit write, which has @flags passed + * in. See osd_declare_write_commit(). + * When force is set to true, we also want to proceed with the gid */ + if (rcu && (rcu != -EDQUOT || flags == NULL)) + RETURN(rcu); + + /* and now group quota */ + qi->lqi_id.qid_gid = gid; + qi->lqi_type = GRPQUOTA; + rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags); + + if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS)) + /* as before, ignore EDQUOT & EINPROGRESS for root */ + rcg = 0; + + RETURN(rcu ? rcu : rcg); +} -- 1.8.3.1