Quota local enforcement for zfs osd.
Signed-off-by: Niu Yawei <niu@whamcloud.com>
Change-Id: I91d6698cc5a3f1eb42dd7fc9120f83b70d2a7a6f
Reviewed-on: http://review.whamcloud.com/3933
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
{
struct osd_thandle *oh = cb_data;
struct thandle *th = &oh->ot_super;
+ struct osd_device *osd = osd_dt_dev(th->th_dev);
struct lu_device *lud = &th->th_dev->dd_lu_dev;
struct dt_txn_commit_cb *dcb, *tmp;
cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage)
dcb->dcb_func(NULL, th, dcb, error);
+ /* Unlike ldiskfs, zfs updates space accounting at commit time.
+ * As a consequence, op_end is called only now to inform the quota slave
+ * component that reserved quota space is now accounted in usage and
+ * should be released. Quota space won't be adjusted at this point since
+ * we can't provide a suitable environment. It will be performed
+ * asynchronously by a lquota thread. */
+ qsd_op_end(NULL, osd->od_quota_slave, &oh->ot_quota_trans);
+
lu_device_put(lud);
th->th_dev = NULL;
lu_context_exit(&th->th_ctx);
LASSERT(oh->ot_tx);
dmu_tx_abort(oh->ot_tx);
osd_object_sa_dirty_rele(oh);
+ /* there won't be any commit, release reserved quota space now,
+ * if any */
+ qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
OBD_FREE_PTR(oh);
RETURN(0);
}
+ /* When doing our own inode accounting, the ZAPs storing per-uid/gid
+ * usage are updated at operation execution time, so we should call
+ * qsd_op_end() straight away. Otherwise (for blk accounting maintained
+ * by ZFS and when #inode is estimated from #blks) accounting is updated
+ * at commit time and the call to qsd_op_end() must be delayed */
+ if (oh->ot_quota_trans.lqt_id_cnt > 0 &&
+ !oh->ot_quota_trans.lqt_ids[0].lqi_is_blk &&
+ !osd->od_quota_iused_est)
+ qsd_op_end(env, osd->od_quota_slave, &oh->ot_quota_trans);
+
rc = dt_txn_hook_stop(env, th);
if (rc != 0)
CDEBUG(D_OTHER, "%s: transaction hook failed: rc = %d\n",
CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
CFS_INIT_LIST_HEAD(&oh->ot_sa_list);
cfs_sema_init(&oh->ot_sa_lock, 1);
+ memset(&oh->ot_quota_trans, 0, sizeof(oh->ot_quota_trans));
th = &oh->ot_super;
th->th_dev = dt;
th->th_result = 0;
zap_attribute_t oti_za;
dmu_object_info_t oti_doi;
struct luz_direntry oti_zde;
+
+ struct lquota_id_info oti_qi;
};
extern struct lu_context_key osd_key;
cfs_list_t ot_sa_list;
cfs_semaphore_t ot_sa_lock;
dmu_tx_t *ot_tx;
+ struct lquota_trans ot_quota_trans;
__u32 ot_write_commit:1,
ot_assigned:1;
};
extern const struct dt_index_operations osd_acct_index_ops;
uint64_t osd_quota_fid2dmu(const struct lu_fid *fid);
extern struct lu_device_operations osd_lu_ops;
+int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
+ qid_t uid, qid_t gid, long long space,
+ struct osd_thandle *oh, bool is_blk, int *flags,
+ bool force);
/*
* Helpers.
struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
uint64_t oid;
ENTRY;
dmu_tx_hold_write(oh->ot_tx, oid, pos, size);
- RETURN(0);
+ /* dt_declare_write() is usually called for system objects, such
+ * as llog or last_rcvd files. We needn't enforce quota on those
+ * objects, so always set the lqi_space as 0. */
+ RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+ obj->oo_attr.la_gid, 0, oh, true, NULL,
+ false));
}
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
return 0;
}
+/* Return number of blocks that aren't mapped in the [start, start + size]
+ * region */
+static int osd_count_not_mapped(struct osd_object *obj, uint64_t start,
+ uint32_t size)
+{
+ dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)obj->oo_db;
+ dmu_buf_impl_t *db;
+ dnode_t *dn;
+ uint32_t blkshift;
+ uint64_t end, blkid;
+ int rc;
+ ENTRY;
+
+ DB_DNODE_ENTER(dbi);
+ dn = DB_DNODE(dbi);
+
+ if (dn->dn_maxblkid == 0) {
+ if (start + size <= dn->dn_datablksz)
+ GOTO(out, size = 0);
+ if (start < dn->dn_datablksz)
+ start = dn->dn_datablksz;
+ /* assume largest block size */
+ blkshift = SPA_MAXBLOCKSHIFT;
+ } else {
+ /* blocksize can't change */
+ blkshift = dn->dn_datablkshift;
+ }
+
+ /* compute address of last block */
+ end = (start + size - 1) >> blkshift;
+ /* align start on block boundaries */
+ start >>= blkshift;
+
+ /* size is null, can't be mapped */
+ if (obj->oo_attr.la_size == 0 || dn->dn_maxblkid == 0)
+ GOTO(out, size = (end - start + 1) << blkshift);
+
+ /* beyond EOF, can't be mapped */
+ if (start > dn->dn_maxblkid)
+ GOTO(out, size = (end - start + 1) << blkshift);
+
+ size = 0;
+ for (blkid = start; blkid <= end; blkid++) {
+ if (blkid == dn->dn_maxblkid)
+ /* this one is mapped for sure */
+ continue;
+ if (blkid > dn->dn_maxblkid) {
+ size += (end - blkid + 1) << blkshift;
+ GOTO(out, size);
+ }
+
+ rc = dbuf_hold_impl(dn, 0, blkid, TRUE, FTAG, &db);
+ if (rc) {
+ /* for ENOENT (block not mapped) and any other errors,
+ * assume the block isn't mapped */
+ size += 1 << blkshift;
+ continue;
+ }
+ dbuf_rele(db, FTAG);
+ }
+
+ GOTO(out, size);
+out:
+ DB_DNODE_EXIT(dbi);
+ return size;
+}
+
static int osd_declare_write_commit(const struct lu_env *env,
struct dt_object *dt,
struct niobuf_local *lnb, int npages,
struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
uint64_t offset = 0;
uint32_t size = 0;
- int i;
+ int i, rc, flags = 0;
+ bool ignore_quota = false, synced = false;
+ long long space = 0;
ENTRY;
LASSERT(dt_object_exists(dt));
* skipped in osd_write_commit(). Hence we skip pages
* with lnb_rc != 0 here too */
continue;
+ /* ignore quota for the whole request if any page is from
+ * client cache or written by root.
+ *
+ * XXX we could handle this on per-lnb basis as done by
+ * grant. */
+ if ((lnb[i].flags & OBD_BRW_NOQUOTA) ||
+ !(lnb[i].flags & OBD_BRW_SYNC))
+ ignore_quota = true;
if (size == 0) {
/* first valid lnb */
offset = lnb[i].lnb_file_offset;
dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
+ /* estimating space that will be consumed by a write is rather
+ * complicated with ZFS. As a consequence, we don't account for
+ * indirect blocks and quota overrun will be adjusted once the
+ * operation is committed, if required. */
+ space += osd_count_not_mapped(obj, offset, size);
+
offset = lnb->lnb_file_offset;
size = lnb->len;
}
- if (size)
+ if (size) {
dmu_tx_hold_write(oh->ot_tx, obj->oo_db->db_object, offset,size);
+ space += osd_count_not_mapped(obj, offset, size);
+ }
dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
- RETURN(0);
+ /* backend zfs filesystem might be configured to store multiple data
+ * copies */
+ space *= osd->od_objset.os->os_copies;
+ space = toqb(space);
+ CDEBUG(D_QUOTA, "writting %d pages, reserving "LPD64"K of quota "
+ "space\n", npages, space);
+
+retry:
+ /* acquire quota space if needed */
+ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+ obj->oo_attr.la_gid, space, oh, true, &flags,
+ ignore_quota);
+
+ if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) {
+ dt_sync(env, th->th_dev);
+ synced = true;
+ CDEBUG(D_QUOTA, "retry after sync\n");
+ flags = 0;
+ goto retry;
+ }
+
+ /* we need only to store the overquota flags in the first lnb for
+ * now, once we support multiple objects BRW, this code needs be
+ * revised. */
+ if (flags & QUOTA_FL_OVER_USRQUOTA)
+ lnb[0].flags |= OBD_BRW_OVER_USRQUOTA;
+ if (flags & QUOTA_FL_OVER_GRPQUOTA)
+ lnb[0].flags |= OBD_BRW_OVER_GRPQUOTA;
+
+ RETURN(rc);
}
static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
__u64 start, __u64 end, struct thandle *handle)
{
struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
__u64 len;
ENTRY;
/* ... and we'll modify size attribute */
dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
- RETURN(0);
+ RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+ obj->oo_attr.la_gid, 0, oh, true, NULL,
+ false));
}
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
uint64_t zapid;
+ int rc;
ENTRY;
LASSERT(th != NULL);
dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, 0, buf);
- RETURN(0);
+ /* one less inode */
+ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+ obj->oo_attr.la_gid, -1, oh, false, NULL, false);
+ if (rc)
+ RETURN(rc);
+
+ /* data to be truncated */
+ rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
+ obj->oo_attr.la_gid, 0, oh, true, NULL, false);
+ RETURN(rc);
}
int __osd_object_free(udmu_objset_t *uos, uint64_t oid, dmu_tx_t *tx)
return 0;
}
+/* Simple wrapper on top of qsd API which implement quota transfer for osd
+ * setattr needs. As a reminder, only the root user can change ownership of
+ * a file, that's why EDQUOT & EINPROGRESS errors are discarded */
+static inline int qsd_transfer(const struct lu_env *env,
+ struct qsd_instance *qsd,
+ struct lquota_trans *trans, int qtype,
+ __u64 orig_id, __u64 new_id, __u64 bspace,
+ struct lquota_id_info *qi)
+{
+ int rc;
+
+ if (unlikely(qsd == NULL))
+ return 0;
+
+ LASSERT(qtype >= 0 && qtype < MAXQUOTAS);
+ qi->lqi_type = qtype;
+
+ /* inode accounting */
+ qi->lqi_is_blk = false;
+
+ /* one more inode for the new owner ... */
+ qi->lqi_id.qid_uid = new_id;
+ qi->lqi_space = 1;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* and one less inode for the current id */
+ qi->lqi_id.qid_uid = orig_id;;
+ qi->lqi_space = -1;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* block accounting */
+ qi->lqi_is_blk = true;
+
+ /* more blocks for the new owner ... */
+ qi->lqi_id.qid_uid = new_id;
+ qi->lqi_space = bspace;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ if (rc)
+ return rc;
+
+ /* and finally less blocks for the current owner */
+ qi->lqi_id.qid_uid = orig_id;
+ qi->lqi_space = -bspace;
+ rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+ if (rc == -EDQUOT || rc == -EINPROGRESS)
+ rc = 0;
+ return rc;
+}
+
static int osd_declare_attr_set(const struct lu_env *env,
struct dt_object *dt,
const struct lu_attr *attr,
struct thandle *handle)
{
+ struct osd_thread_info *info = osd_oti_get(env);
char *buf = osd_oti_get(env)->oti_str;
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
+ uint64_t bspace;
+ uint32_t blksize;
+ int rc;
ENTRY;
if (!dt_object_exists(dt)) {
LASSERT(obj->oo_sa_hdl != NULL);
dmu_tx_hold_sa(oh->ot_tx, obj->oo_sa_hdl, 0);
+ sa_object_size(obj->oo_sa_hdl, &blksize, &bspace);
+ bspace = toqb(bspace * blksize);
+
if (attr && attr->la_valid & LA_UID) {
/* account for user inode tracking ZAP update */
dmu_tx_hold_bonus(oh->ot_tx, osd->od_iusr_oid);
dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, TRUE, buf);
+
+ /* quota enforcement for user */
+ if (attr->la_uid != obj->oo_attr.la_uid) {
+ rc = qsd_transfer(env, osd->od_quota_slave,
+ &oh->ot_quota_trans, USRQUOTA,
+ obj->oo_attr.la_uid, attr->la_uid,
+ bspace, &info->oti_qi);
+ if (rc)
+ RETURN(rc);
+ }
}
if (attr && attr->la_valid & LA_GID) {
/* account for user inode tracking ZAP update */
dmu_tx_hold_bonus(oh->ot_tx, osd->od_igrp_oid);
dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, TRUE, buf);
+
+ /* quota enforcement for group */
+ if (attr->la_gid != obj->oo_attr.la_gid) {
+ rc = qsd_transfer(env, osd->od_quota_slave,
+ &oh->ot_quota_trans, GRPQUOTA,
+ obj->oo_attr.la_gid, attr->la_gid,
+ bspace, &info->oti_qi);
+ if (rc)
+ RETURN(rc);
+ }
}
RETURN(0);
dmu_tx_hold_sa_create(oh->ot_tx, ZFS_SA_BASE_ATTR_SIZE);
- RETURN(0);
+ RETURN(osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
+ false, NULL, false));
}
int __osd_attr_init(const struct lu_env *env, udmu_objset_t *uos,
/**
* Quota Enforcement Management
- * TODO
*/
+
+/*
+ * Wrapper for qsd_op_begin().
+ *
+ * \param env - the environment passed by the caller
+ * \param osd - is the osd_device
+ * \param uid - user id of the inode
+ * \param gid - group id of the inode
+ * \param space - how many blocks/inodes will be consumed/released
+ * \param oh - osd transaction handle
+ * \param is_blk - block quota or inode quota?
+ * \param flags - if the operation is write, return no user quota, no
+ * group quota, or sync commit flags to the caller
+ * \param force - set to 1 when changes are performed by root user and thus
+ * can't failed with EDQUOT
+ *
+ * \retval 0 - success
+ * \retval -ve - failure
+ */
+int osd_declare_quota(const struct lu_env *env, struct osd_device *osd,
+ qid_t uid, qid_t gid, long long space,
+ struct osd_thandle *oh, bool is_blk, int *flags,
+ bool force)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ struct lquota_id_info *qi = &info->oti_qi;
+ struct qsd_instance *qsd = osd->od_quota_slave;
+ int rcu, rcg; /* user & group rc */
+ ENTRY;
+
+ if (unlikely(qsd == NULL))
+ /* quota slave instance hasn't been allocated yet */
+ RETURN(0);
+
+ /* let's start with user quota */
+ qi->lqi_id.qid_uid = uid;
+ qi->lqi_type = USRQUOTA;
+ qi->lqi_space = space;
+ qi->lqi_is_blk = is_blk;
+ rcu = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
+
+ if (force && (rcu == -EDQUOT || rcu == -EINPROGRESS))
+ /* ignore EDQUOT & EINPROGRESS when changes are done by root */
+ rcu = 0;
+
+ /* For non-fatal error, we want to continue to get the noquota flags
+ * for group id. This is only for commit write, which has @flags passed
+ * in. See osd_declare_write_commit().
+ * When force is set to true, we also want to proceed with the gid */
+ if (rcu && (rcu != -EDQUOT || flags == NULL))
+ RETURN(rcu);
+
+ /* and now group quota */
+ qi->lqi_id.qid_gid = gid;
+ qi->lqi_type = GRPQUOTA;
+ rcg = qsd_op_begin(env, qsd, &oh->ot_quota_trans, qi, flags);
+
+ if (force && (rcg == -EDQUOT || rcg == -EINPROGRESS))
+ /* as before, ignore EDQUOT & EINPROGRESS for root */
+ rcg = 0;
+
+ RETURN(rcu ? rcu : rcg);
+}